chiark / gitweb /
bus: add the ability for backends to queue to input messages at the same time
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49 #include "bus-container.h"
50
51 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
52
53 static void bus_close_fds(sd_bus *b) {
54         assert(b);
55
56         if (b->input_fd >= 0)
57                 close_nointr_nofail(b->input_fd);
58
59         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
60                 close_nointr_nofail(b->output_fd);
61
62         b->input_fd = b->output_fd = -1;
63 }
64
65 static void bus_node_destroy(sd_bus *b, struct node *n) {
66         struct node_callback *c;
67         struct node_vtable *v;
68         struct node_enumerator *e;
69
70         assert(b);
71
72         if (!n)
73                 return;
74
75         while (n->child)
76                 bus_node_destroy(b, n->child);
77
78         while ((c = n->callbacks)) {
79                 LIST_REMOVE(callbacks, n->callbacks, c);
80                 free(c);
81         }
82
83         while ((v = n->vtables)) {
84                 LIST_REMOVE(vtables, n->vtables, v);
85                 free(v->interface);
86                 free(v);
87         }
88
89         while ((e = n->enumerators)) {
90                 LIST_REMOVE(enumerators, n->enumerators, e);
91                 free(e);
92         }
93
94         if (n->parent)
95                 LIST_REMOVE(siblings, n->parent->child, n);
96
97         assert_se(hashmap_remove(b->nodes, n->path) == n);
98         free(n->path);
99         free(n);
100 }
101
102 static void bus_reset_queues(sd_bus *b) {
103         unsigned i;
104
105         assert(b);
106
107         for (i = 0; i < b->rqueue_size; i++)
108                 sd_bus_message_unref(b->rqueue[i]);
109         free(b->rqueue);
110
111         for (i = 0; i < b->wqueue_size; i++)
112                 sd_bus_message_unref(b->wqueue[i]);
113         free(b->wqueue);
114
115         b->rqueue = b->wqueue = NULL;
116         b->rqueue_size = b->wqueue_size = 0;
117 }
118
119 static void bus_free(sd_bus *b) {
120         struct filter_callback *f;
121         struct node *n;
122
123         assert(b);
124
125         sd_bus_detach_event(b);
126
127         bus_close_fds(b);
128
129         if (b->kdbus_buffer)
130                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
131
132         free(b->rbuffer);
133         free(b->unique_name);
134         free(b->auth_buffer);
135         free(b->address);
136         free(b->kernel);
137         free(b->machine);
138
139         free(b->exec_path);
140         strv_free(b->exec_argv);
141
142         close_many(b->fds, b->n_fds);
143         free(b->fds);
144
145         bus_reset_queues(b);
146
147         hashmap_free_free(b->reply_callbacks);
148         prioq_free(b->reply_callbacks_prioq);
149
150         while ((f = b->filter_callbacks)) {
151                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
152                 free(f);
153         }
154
155         bus_match_free(&b->match_callbacks);
156
157         hashmap_free_free(b->vtable_methods);
158         hashmap_free_free(b->vtable_properties);
159
160         while ((n = hashmap_first(b->nodes)))
161                 bus_node_destroy(b, n);
162
163         hashmap_free(b->nodes);
164
165         bus_kernel_flush_memfd(b);
166
167         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
168
169         free(b);
170 }
171
172 _public_ int sd_bus_new(sd_bus **ret) {
173         sd_bus *r;
174
175         assert_return(ret, -EINVAL);
176
177         r = new0(sd_bus, 1);
178         if (!r)
179                 return -ENOMEM;
180
181         r->n_ref = REFCNT_INIT;
182         r->input_fd = r->output_fd = -1;
183         r->message_version = 1;
184         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
185         r->attach_flags |= KDBUS_ATTACH_NAMES;
186         r->original_pid = getpid();
187
188         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
189
190         /* We guarantee that wqueue always has space for at least one
191          * entry */
192         r->wqueue = new(sd_bus_message*, 1);
193         if (!r->wqueue) {
194                 free(r);
195                 return -ENOMEM;
196         }
197
198         *ret = r;
199         return 0;
200 }
201
202 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
203         char *a;
204
205         assert_return(bus, -EINVAL);
206         assert_return(bus->state == BUS_UNSET, -EPERM);
207         assert_return(address, -EINVAL);
208         assert_return(!bus_pid_changed(bus), -ECHILD);
209
210         a = strdup(address);
211         if (!a)
212                 return -ENOMEM;
213
214         free(bus->address);
215         bus->address = a;
216
217         return 0;
218 }
219
220 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
221         assert_return(bus, -EINVAL);
222         assert_return(bus->state == BUS_UNSET, -EPERM);
223         assert_return(input_fd >= 0, -EINVAL);
224         assert_return(output_fd >= 0, -EINVAL);
225         assert_return(!bus_pid_changed(bus), -ECHILD);
226
227         bus->input_fd = input_fd;
228         bus->output_fd = output_fd;
229         return 0;
230 }
231
232 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
233         char *p, **a;
234
235         assert_return(bus, -EINVAL);
236         assert_return(bus->state == BUS_UNSET, -EPERM);
237         assert_return(path, -EINVAL);
238         assert_return(!strv_isempty(argv), -EINVAL);
239         assert_return(!bus_pid_changed(bus), -ECHILD);
240
241         p = strdup(path);
242         if (!p)
243                 return -ENOMEM;
244
245         a = strv_copy(argv);
246         if (!a) {
247                 free(p);
248                 return -ENOMEM;
249         }
250
251         free(bus->exec_path);
252         strv_free(bus->exec_argv);
253
254         bus->exec_path = p;
255         bus->exec_argv = a;
256
257         return 0;
258 }
259
260 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
261         assert_return(bus, -EINVAL);
262         assert_return(bus->state == BUS_UNSET, -EPERM);
263         assert_return(!bus_pid_changed(bus), -ECHILD);
264
265         bus->bus_client = !!b;
266         return 0;
267 }
268
269 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
270         assert_return(bus, -EINVAL);
271         assert_return(bus->state == BUS_UNSET, -EPERM);
272         assert_return(!bus_pid_changed(bus), -ECHILD);
273
274         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
275         return 0;
276 }
277
278 _public_ int sd_bus_negotiate_attach_timestamp(sd_bus *bus, int b) {
279         assert_return(bus, -EINVAL);
280         assert_return(bus->state == BUS_UNSET, -EPERM);
281         assert_return(!bus_pid_changed(bus), -ECHILD);
282
283         SET_FLAG(bus->attach_flags, KDBUS_ATTACH_TIMESTAMP, b);
284         return 0;
285 }
286
287 _public_ int sd_bus_negotiate_attach_creds(sd_bus *bus, uint64_t mask) {
288         assert_return(bus, -EINVAL);
289         assert_return(mask <= _SD_BUS_CREDS_MAX, -EINVAL);
290         assert_return(bus->state == BUS_UNSET, -EPERM);
291         assert_return(!bus_pid_changed(bus), -ECHILD);
292
293         return kdbus_translate_attach_flags(mask, &bus->creds_mask);
294 }
295
296 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
297         assert_return(bus, -EINVAL);
298         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
299         assert_return(bus->state == BUS_UNSET, -EPERM);
300         assert_return(!bus_pid_changed(bus), -ECHILD);
301
302         bus->is_server = !!b;
303         bus->server_id = server_id;
304         return 0;
305 }
306
307 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
308         assert_return(bus, -EINVAL);
309         assert_return(bus->state == BUS_UNSET, -EPERM);
310         assert_return(!bus_pid_changed(bus), -ECHILD);
311
312         bus->anonymous_auth = !!b;
313         return 0;
314 }
315
316 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata, sd_bus_error *error) {
317         const char *s;
318         int r;
319
320         assert(bus);
321         assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
322         assert(reply);
323
324         r = sd_bus_message_get_errno(reply);
325         if (r < 0)
326                 return r;
327         if (r > 0)
328                 return -r;
329
330         r = sd_bus_message_read(reply, "s", &s);
331         if (r < 0)
332                 return r;
333
334         if (!service_name_is_valid(s) || s[0] != ':')
335                 return -EBADMSG;
336
337         bus->unique_name = strdup(s);
338         if (!bus->unique_name)
339                 return -ENOMEM;
340
341         if (bus->state == BUS_HELLO)
342                 bus->state = BUS_RUNNING;
343
344         return 1;
345 }
346
347 static int bus_send_hello(sd_bus *bus) {
348         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
349         int r;
350
351         assert(bus);
352
353         if (!bus->bus_client || bus->is_kernel)
354                 return 0;
355
356         r = sd_bus_message_new_method_call(
357                         bus,
358                         "org.freedesktop.DBus",
359                         "/",
360                         "org.freedesktop.DBus",
361                         "Hello",
362                         &m);
363         if (r < 0)
364                 return r;
365
366         return sd_bus_call_async(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
367 }
368
369 int bus_start_running(sd_bus *bus) {
370         assert(bus);
371
372         if (bus->bus_client && !bus->is_kernel) {
373                 bus->state = BUS_HELLO;
374                 return 1;
375         }
376
377         bus->state = BUS_RUNNING;
378         return 1;
379 }
380
381 static int parse_address_key(const char **p, const char *key, char **value) {
382         size_t l, n = 0;
383         const char *a;
384         char *r = NULL;
385
386         assert(p);
387         assert(*p);
388         assert(value);
389
390         if (key) {
391                 l = strlen(key);
392                 if (strncmp(*p, key, l) != 0)
393                         return 0;
394
395                 if ((*p)[l] != '=')
396                         return 0;
397
398                 if (*value)
399                         return -EINVAL;
400
401                 a = *p + l + 1;
402         } else
403                 a = *p;
404
405         while (*a != ';' && *a != ',' && *a != 0) {
406                 char c, *t;
407
408                 if (*a == '%') {
409                         int x, y;
410
411                         x = unhexchar(a[1]);
412                         if (x < 0) {
413                                 free(r);
414                                 return x;
415                         }
416
417                         y = unhexchar(a[2]);
418                         if (y < 0) {
419                                 free(r);
420                                 return y;
421                         }
422
423                         c = (char) ((x << 4) | y);
424                         a += 3;
425                 } else {
426                         c = *a;
427                         a++;
428                 }
429
430                 t = realloc(r, n + 2);
431                 if (!t) {
432                         free(r);
433                         return -ENOMEM;
434                 }
435
436                 r = t;
437                 r[n++] = c;
438         }
439
440         if (!r) {
441                 r = strdup("");
442                 if (!r)
443                         return -ENOMEM;
444         } else
445                 r[n] = 0;
446
447         if (*a == ',')
448                 a++;
449
450         *p = a;
451
452         free(*value);
453         *value = r;
454
455         return 1;
456 }
457
458 static void skip_address_key(const char **p) {
459         assert(p);
460         assert(*p);
461
462         *p += strcspn(*p, ",");
463
464         if (**p == ',')
465                 (*p) ++;
466 }
467
468 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
469         _cleanup_free_ char *path = NULL, *abstract = NULL;
470         size_t l;
471         int r;
472
473         assert(b);
474         assert(p);
475         assert(*p);
476         assert(guid);
477
478         while (**p != 0 && **p != ';') {
479                 r = parse_address_key(p, "guid", guid);
480                 if (r < 0)
481                         return r;
482                 else if (r > 0)
483                         continue;
484
485                 r = parse_address_key(p, "path", &path);
486                 if (r < 0)
487                         return r;
488                 else if (r > 0)
489                         continue;
490
491                 r = parse_address_key(p, "abstract", &abstract);
492                 if (r < 0)
493                         return r;
494                 else if (r > 0)
495                         continue;
496
497                 skip_address_key(p);
498         }
499
500         if (!path && !abstract)
501                 return -EINVAL;
502
503         if (path && abstract)
504                 return -EINVAL;
505
506         if (path) {
507                 l = strlen(path);
508                 if (l > sizeof(b->sockaddr.un.sun_path))
509                         return -E2BIG;
510
511                 b->sockaddr.un.sun_family = AF_UNIX;
512                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
513                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
514         } else if (abstract) {
515                 l = strlen(abstract);
516                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
517                         return -E2BIG;
518
519                 b->sockaddr.un.sun_family = AF_UNIX;
520                 b->sockaddr.un.sun_path[0] = 0;
521                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
522                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
523         }
524
525         return 0;
526 }
527
528 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
529         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
530         int r;
531         struct addrinfo *result, hints = {
532                 .ai_socktype = SOCK_STREAM,
533                 .ai_flags = AI_ADDRCONFIG,
534         };
535
536         assert(b);
537         assert(p);
538         assert(*p);
539         assert(guid);
540
541         while (**p != 0 && **p != ';') {
542                 r = parse_address_key(p, "guid", guid);
543                 if (r < 0)
544                         return r;
545                 else if (r > 0)
546                         continue;
547
548                 r = parse_address_key(p, "host", &host);
549                 if (r < 0)
550                         return r;
551                 else if (r > 0)
552                         continue;
553
554                 r = parse_address_key(p, "port", &port);
555                 if (r < 0)
556                         return r;
557                 else if (r > 0)
558                         continue;
559
560                 r = parse_address_key(p, "family", &family);
561                 if (r < 0)
562                         return r;
563                 else if (r > 0)
564                         continue;
565
566                 skip_address_key(p);
567         }
568
569         if (!host || !port)
570                 return -EINVAL;
571
572         if (family) {
573                 if (streq(family, "ipv4"))
574                         hints.ai_family = AF_INET;
575                 else if (streq(family, "ipv6"))
576                         hints.ai_family = AF_INET6;
577                 else
578                         return -EINVAL;
579         }
580
581         r = getaddrinfo(host, port, &hints, &result);
582         if (r == EAI_SYSTEM)
583                 return -errno;
584         else if (r != 0)
585                 return -EADDRNOTAVAIL;
586
587         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
588         b->sockaddr_size = result->ai_addrlen;
589
590         freeaddrinfo(result);
591
592         return 0;
593 }
594
595 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
596         char *path = NULL;
597         unsigned n_argv = 0, j;
598         char **argv = NULL;
599         int r;
600
601         assert(b);
602         assert(p);
603         assert(*p);
604         assert(guid);
605
606         while (**p != 0 && **p != ';') {
607                 r = parse_address_key(p, "guid", guid);
608                 if (r < 0)
609                         goto fail;
610                 else if (r > 0)
611                         continue;
612
613                 r = parse_address_key(p, "path", &path);
614                 if (r < 0)
615                         goto fail;
616                 else if (r > 0)
617                         continue;
618
619                 if (startswith(*p, "argv")) {
620                         unsigned ul;
621
622                         errno = 0;
623                         ul = strtoul(*p + 4, (char**) p, 10);
624                         if (errno > 0 || **p != '=' || ul > 256) {
625                                 r = -EINVAL;
626                                 goto fail;
627                         }
628
629                         (*p) ++;
630
631                         if (ul >= n_argv) {
632                                 char **x;
633
634                                 x = realloc(argv, sizeof(char*) * (ul + 2));
635                                 if (!x) {
636                                         r = -ENOMEM;
637                                         goto fail;
638                                 }
639
640                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
641
642                                 argv = x;
643                                 n_argv = ul + 1;
644                         }
645
646                         r = parse_address_key(p, NULL, argv + ul);
647                         if (r < 0)
648                                 goto fail;
649
650                         continue;
651                 }
652
653                 skip_address_key(p);
654         }
655
656         if (!path) {
657                 r = -EINVAL;
658                 goto fail;
659         }
660
661         /* Make sure there are no holes in the array, with the
662          * exception of argv[0] */
663         for (j = 1; j < n_argv; j++)
664                 if (!argv[j]) {
665                         r = -EINVAL;
666                         goto fail;
667                 }
668
669         if (argv && argv[0] == NULL) {
670                 argv[0] = strdup(path);
671                 if (!argv[0]) {
672                         r = -ENOMEM;
673                         goto fail;
674                 }
675         }
676
677         b->exec_path = path;
678         b->exec_argv = argv;
679         return 0;
680
681 fail:
682         for (j = 0; j < n_argv; j++)
683                 free(argv[j]);
684
685         free(argv);
686         free(path);
687         return r;
688 }
689
690 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
691         _cleanup_free_ char *path = NULL;
692         int r;
693
694         assert(b);
695         assert(p);
696         assert(*p);
697         assert(guid);
698
699         while (**p != 0 && **p != ';') {
700                 r = parse_address_key(p, "guid", guid);
701                 if (r < 0)
702                         return r;
703                 else if (r > 0)
704                         continue;
705
706                 r = parse_address_key(p, "path", &path);
707                 if (r < 0)
708                         return r;
709                 else if (r > 0)
710                         continue;
711
712                 skip_address_key(p);
713         }
714
715         if (!path)
716                 return -EINVAL;
717
718         free(b->kernel);
719         b->kernel = path;
720         path = NULL;
721
722         return 0;
723 }
724
725 static int parse_container_address(sd_bus *b, const char **p, char **guid) {
726         _cleanup_free_ char *machine = NULL;
727         int r;
728
729         assert(b);
730         assert(p);
731         assert(*p);
732         assert(guid);
733
734         while (**p != 0 && **p != ';') {
735                 r = parse_address_key(p, "guid", guid);
736                 if (r < 0)
737                         return r;
738                 else if (r > 0)
739                         continue;
740
741                 r = parse_address_key(p, "machine", &machine);
742                 if (r < 0)
743                         return r;
744                 else if (r > 0)
745                         continue;
746
747                 skip_address_key(p);
748         }
749
750         if (!machine)
751                 return -EINVAL;
752
753         free(b->machine);
754         b->machine = machine;
755         machine = NULL;
756
757         b->sockaddr.un.sun_family = AF_UNIX;
758         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
759         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/var/run/dbus/system_bus_socket") - 1;
760
761         return 0;
762 }
763
764 static void bus_reset_parsed_address(sd_bus *b) {
765         assert(b);
766
767         zero(b->sockaddr);
768         b->sockaddr_size = 0;
769         strv_free(b->exec_argv);
770         free(b->exec_path);
771         b->exec_path = NULL;
772         b->exec_argv = NULL;
773         b->server_id = SD_ID128_NULL;
774         free(b->kernel);
775         b->kernel = NULL;
776         free(b->machine);
777         b->machine = NULL;
778 }
779
780 static int bus_parse_next_address(sd_bus *b) {
781         _cleanup_free_ char *guid = NULL;
782         const char *a;
783         int r;
784
785         assert(b);
786
787         if (!b->address)
788                 return 0;
789         if (b->address[b->address_index] == 0)
790                 return 0;
791
792         bus_reset_parsed_address(b);
793
794         a = b->address + b->address_index;
795
796         while (*a != 0) {
797
798                 if (*a == ';') {
799                         a++;
800                         continue;
801                 }
802
803                 if (startswith(a, "unix:")) {
804                         a += 5;
805
806                         r = parse_unix_address(b, &a, &guid);
807                         if (r < 0)
808                                 return r;
809                         break;
810
811                 } else if (startswith(a, "tcp:")) {
812
813                         a += 4;
814                         r = parse_tcp_address(b, &a, &guid);
815                         if (r < 0)
816                                 return r;
817
818                         break;
819
820                 } else if (startswith(a, "unixexec:")) {
821
822                         a += 9;
823                         r = parse_exec_address(b, &a, &guid);
824                         if (r < 0)
825                                 return r;
826
827                         break;
828
829                 } else if (startswith(a, "kernel:")) {
830
831                         a += 7;
832                         r = parse_kernel_address(b, &a, &guid);
833                         if (r < 0)
834                                 return r;
835
836                         break;
837                 } else if (startswith(a, "x-container:")) {
838
839                         a += 12;
840                         r = parse_container_address(b, &a, &guid);
841                         if (r < 0)
842                                 return r;
843
844                         break;
845                 }
846
847                 a = strchr(a, ';');
848                 if (!a)
849                         return 0;
850         }
851
852         if (guid) {
853                 r = sd_id128_from_string(guid, &b->server_id);
854                 if (r < 0)
855                         return r;
856         }
857
858         b->address_index = a - b->address;
859         return 1;
860 }
861
862 static int bus_start_address(sd_bus *b) {
863         int r;
864
865         assert(b);
866
867         for (;;) {
868                 sd_bus_close(b);
869
870                 if (b->exec_path) {
871
872                         r = bus_socket_exec(b);
873                         if (r >= 0)
874                                 return r;
875
876                         b->last_connect_error = -r;
877                 } else if (b->kernel) {
878
879                         r = bus_kernel_connect(b);
880                         if (r >= 0)
881                                 return r;
882
883                         b->last_connect_error = -r;
884
885                 } else if (b->machine) {
886
887                         r = bus_container_connect(b);
888                         if (r >= 0)
889                                 return r;
890
891                         b->last_connect_error = -r;
892
893                 } else if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
894
895                         r = bus_socket_connect(b);
896                         if (r >= 0)
897                                 return r;
898
899                         b->last_connect_error = -r;
900                 }
901
902                 r = bus_parse_next_address(b);
903                 if (r < 0)
904                         return r;
905                 if (r == 0)
906                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
907         }
908 }
909
910 int bus_next_address(sd_bus *b) {
911         assert(b);
912
913         bus_reset_parsed_address(b);
914         return bus_start_address(b);
915 }
916
917 static int bus_start_fd(sd_bus *b) {
918         struct stat st;
919         int r;
920
921         assert(b);
922         assert(b->input_fd >= 0);
923         assert(b->output_fd >= 0);
924
925         r = fd_nonblock(b->input_fd, true);
926         if (r < 0)
927                 return r;
928
929         r = fd_cloexec(b->input_fd, true);
930         if (r < 0)
931                 return r;
932
933         if (b->input_fd != b->output_fd) {
934                 r = fd_nonblock(b->output_fd, true);
935                 if (r < 0)
936                         return r;
937
938                 r = fd_cloexec(b->output_fd, true);
939                 if (r < 0)
940                         return r;
941         }
942
943         if (fstat(b->input_fd, &st) < 0)
944                 return -errno;
945
946         if (S_ISCHR(b->input_fd))
947                 return bus_kernel_take_fd(b);
948         else
949                 return bus_socket_take_fd(b);
950 }
951
952 _public_ int sd_bus_start(sd_bus *bus) {
953         int r;
954
955         assert_return(bus, -EINVAL);
956         assert_return(bus->state == BUS_UNSET, -EPERM);
957         assert_return(!bus_pid_changed(bus), -ECHILD);
958
959         bus->state = BUS_OPENING;
960
961         if (bus->is_server && bus->bus_client)
962                 return -EINVAL;
963
964         if (bus->input_fd >= 0)
965                 r = bus_start_fd(bus);
966         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
967                 r = bus_start_address(bus);
968         else
969                 return -EINVAL;
970
971         if (r < 0)
972                 return r;
973
974         return bus_send_hello(bus);
975 }
976
977 _public_ int sd_bus_open_system(sd_bus **ret) {
978         const char *e;
979         sd_bus *b;
980         int r;
981
982         assert_return(ret, -EINVAL);
983
984         r = sd_bus_new(&b);
985         if (r < 0)
986                 return r;
987
988         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
989         if (e) {
990                 r = sd_bus_set_address(b, e);
991                 if (r < 0)
992                         goto fail;
993         } else {
994                 b->sockaddr.un.sun_family = AF_UNIX;
995                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
996                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
997         }
998
999         b->bus_client = true;
1000
1001         r = sd_bus_start(b);
1002         if (r < 0)
1003                 goto fail;
1004
1005         *ret = b;
1006         return 0;
1007
1008 fail:
1009         bus_free(b);
1010         return r;
1011 }
1012
1013 _public_ int sd_bus_open_user(sd_bus **ret) {
1014         const char *e;
1015         sd_bus *b;
1016         size_t l;
1017         int r;
1018
1019         assert_return(ret, -EINVAL);
1020
1021         r = sd_bus_new(&b);
1022         if (r < 0)
1023                 return r;
1024
1025         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1026         if (e) {
1027                 r = sd_bus_set_address(b, e);
1028                 if (r < 0)
1029                         goto fail;
1030         } else {
1031                 e = secure_getenv("XDG_RUNTIME_DIR");
1032                 if (!e) {
1033                         r = -ENOENT;
1034                         goto fail;
1035                 }
1036
1037                 l = strlen(e);
1038                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1039                         r = -E2BIG;
1040                         goto fail;
1041                 }
1042
1043                 b->sockaddr.un.sun_family = AF_UNIX;
1044                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1045                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1046         }
1047
1048         b->bus_client = true;
1049
1050         r = sd_bus_start(b);
1051         if (r < 0)
1052                 goto fail;
1053
1054         *ret = b;
1055         return 0;
1056
1057 fail:
1058         bus_free(b);
1059         return r;
1060 }
1061
1062 _public_ int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1063         _cleanup_free_ char *e = NULL;
1064         char *p = NULL;
1065         sd_bus *bus;
1066         int r;
1067
1068         assert_return(host, -EINVAL);
1069         assert_return(ret, -EINVAL);
1070
1071         e = bus_address_escape(host);
1072         if (!e)
1073                 return -ENOMEM;
1074
1075         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1076         if (!p)
1077                 return -ENOMEM;
1078
1079         r = sd_bus_new(&bus);
1080         if (r < 0) {
1081                 free(p);
1082                 return r;
1083         }
1084
1085         bus->address = p;
1086         bus->bus_client = true;
1087
1088         r = sd_bus_start(bus);
1089         if (r < 0) {
1090                 bus_free(bus);
1091                 return r;
1092         }
1093
1094         *ret = bus;
1095         return 0;
1096 }
1097
1098 _public_ int sd_bus_open_system_container(const char *machine, sd_bus **ret) {
1099         _cleanup_free_ char *e = NULL;
1100         sd_bus *bus;
1101         char *p;
1102         int r;
1103
1104         assert_return(machine, -EINVAL);
1105         assert_return(ret, -EINVAL);
1106
1107         e = bus_address_escape(machine);
1108         if (!e)
1109                 return -ENOMEM;
1110
1111         p = strjoin("x-container:machine=", e, NULL);
1112         if (!p)
1113                 return -ENOMEM;
1114
1115         r = sd_bus_new(&bus);
1116         if (r < 0) {
1117                 free(p);
1118                 return r;
1119         }
1120
1121         bus->address = p;
1122         bus->bus_client = true;
1123
1124         r = sd_bus_start(bus);
1125         if (r < 0) {
1126                 bus_free(bus);
1127                 return r;
1128         }
1129
1130         *ret = bus;
1131         return 0;
1132 }
1133
1134 _public_ void sd_bus_close(sd_bus *bus) {
1135
1136         if (!bus)
1137                 return;
1138         if (bus->state == BUS_CLOSED)
1139                 return;
1140         if (bus_pid_changed(bus))
1141                 return;
1142
1143         bus->state = BUS_CLOSED;
1144
1145         sd_bus_detach_event(bus);
1146
1147         /* Drop all queued messages so that they drop references to
1148          * the bus object and the bus may be freed */
1149         bus_reset_queues(bus);
1150
1151         if (!bus->is_kernel)
1152                 bus_close_fds(bus);
1153
1154         /* We'll leave the fd open in case this is a kernel bus, since
1155          * there might still be memblocks around that reference this
1156          * bus, and they might need to invoke the
1157          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1158          * freed. */
1159 }
1160
1161 static void bus_enter_closing(sd_bus *bus) {
1162         assert(bus);
1163
1164         if (bus->state != BUS_OPENING &&
1165             bus->state != BUS_AUTHENTICATING &&
1166             bus->state != BUS_HELLO &&
1167             bus->state != BUS_RUNNING)
1168                 return;
1169
1170         bus->state = BUS_CLOSING;
1171 }
1172
1173 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1174         assert_return(bus, NULL);
1175
1176         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1177
1178         return bus;
1179 }
1180
1181 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1182         assert_return(bus, NULL);
1183
1184         if (REFCNT_DEC(bus->n_ref) <= 0)
1185                 bus_free(bus);
1186
1187         return NULL;
1188 }
1189
1190 _public_ int sd_bus_is_open(sd_bus *bus) {
1191
1192         assert_return(bus, -EINVAL);
1193         assert_return(!bus_pid_changed(bus), -ECHILD);
1194
1195         return BUS_IS_OPEN(bus->state);
1196 }
1197
1198 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1199         int r;
1200
1201         assert_return(bus, -EINVAL);
1202         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1203         assert_return(!bus_pid_changed(bus), -ECHILD);
1204
1205         if (type == SD_BUS_TYPE_UNIX_FD) {
1206                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1207                         return 0;
1208
1209                 r = bus_ensure_running(bus);
1210                 if (r < 0)
1211                         return r;
1212
1213                 return bus->can_fds;
1214         }
1215
1216         return bus_type_is_valid(type);
1217 }
1218
1219 _public_ int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1220         int r;
1221
1222         assert_return(bus, -EINVAL);
1223         assert_return(server_id, -EINVAL);
1224         assert_return(!bus_pid_changed(bus), -ECHILD);
1225
1226         r = bus_ensure_running(bus);
1227         if (r < 0)
1228                 return r;
1229
1230         *server_id = bus->server_id;
1231         return 0;
1232 }
1233
1234 int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1235         assert(m);
1236
1237         if (m->header->version > b->message_version)
1238                 return -EPERM;
1239
1240         if (m->sealed) {
1241                 /* If we copy the same message to multiple
1242                  * destinations, avoid using the same serial
1243                  * numbers. */
1244                 b->serial = MAX(b->serial, BUS_MESSAGE_SERIAL(m));
1245                 return 0;
1246         }
1247
1248         return bus_message_seal(m, ++b->serial);
1249 }
1250
1251 static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
1252         int r;
1253
1254         assert(bus);
1255         assert(message);
1256
1257         if (bus->is_kernel)
1258                 r = bus_kernel_write_message(bus, message);
1259         else
1260                 r = bus_socket_write_message(bus, message, idx);
1261
1262         return r;
1263 }
1264
1265 static int dispatch_wqueue(sd_bus *bus) {
1266         int r, ret = 0;
1267
1268         assert(bus);
1269         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1270
1271         while (bus->wqueue_size > 0) {
1272
1273                 r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
1274                 if (r < 0)
1275                         return r;
1276                 else if (r == 0)
1277                         /* Didn't do anything this time */
1278                         return ret;
1279                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1280                         /* Fully written. Let's drop the entry from
1281                          * the queue.
1282                          *
1283                          * This isn't particularly optimized, but
1284                          * well, this is supposed to be our worst-case
1285                          * buffer only, and the socket buffer is
1286                          * supposed to be our primary buffer, and if
1287                          * it got full, then all bets are off
1288                          * anyway. */
1289
1290                         sd_bus_message_unref(bus->wqueue[0]);
1291                         bus->wqueue_size --;
1292                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1293                         bus->windex = 0;
1294
1295                         ret = 1;
1296                 }
1297         }
1298
1299         return ret;
1300 }
1301
1302 static int bus_read_message(sd_bus *bus) {
1303         assert(bus);
1304
1305         if (bus->is_kernel)
1306                 return bus_kernel_read_message(bus);
1307         else
1308                 return bus_socket_read_message(bus);
1309 }
1310
1311 int bus_rqueue_make_room(sd_bus *bus, unsigned n) {
1312         sd_bus_message **q;
1313         unsigned x;
1314
1315         x = bus->rqueue_size + n;
1316
1317         if (bus->rqueue_allocated >= x)
1318                 return 0;
1319
1320         if (x > BUS_RQUEUE_MAX)
1321                 return -ENOBUFS;
1322
1323         q = realloc(bus->rqueue, x * sizeof(sd_bus_message*));
1324         if (!q)
1325                 return -ENOMEM;
1326
1327         bus->rqueue = q;
1328         bus->rqueue_allocated = x;
1329
1330         return 0;
1331 }
1332
1333 int bus_rqueue_push(sd_bus *bus, sd_bus_message *m) {
1334         int r;
1335
1336         assert(bus);
1337         assert(m);
1338
1339         r = bus_rqueue_make_room(bus, 1);
1340         if (r < 0)
1341                 return r;
1342
1343         bus->rqueue[bus->rqueue_size++] = m;
1344
1345         return 0;
1346 }
1347
1348 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1349         int r, ret = 0;
1350
1351         assert(bus);
1352         assert(m);
1353         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1354
1355         for (;;) {
1356                 if (bus->rqueue_size > 0) {
1357                         /* Dispatch a queued message */
1358
1359                         *m = bus->rqueue[0];
1360                         bus->rqueue_size --;
1361                         memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1362                         return 1;
1363                 }
1364
1365                 /* Try to read a new message */
1366                 r = bus_read_message(bus);
1367                 if (r < 0)
1368                         return r;
1369                 if (r == 0)
1370                         return ret;
1371
1372                 ret = 1;
1373         }
1374 }
1375
1376 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1377         int r;
1378
1379         assert_return(bus, -EINVAL);
1380         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1381         assert_return(m, -EINVAL);
1382         assert_return(!bus_pid_changed(bus), -ECHILD);
1383
1384         if (m->n_fds > 0) {
1385                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1386                 if (r < 0)
1387                         return r;
1388                 if (r == 0)
1389                         return -ENOTSUP;
1390         }
1391
1392         /* If the serial number isn't kept, then we know that no reply
1393          * is expected */
1394         if (!serial && !m->sealed)
1395                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1396
1397         r = bus_seal_message(bus, m);
1398         if (r < 0)
1399                 return r;
1400
1401         /* If this is a reply and no reply was requested, then let's
1402          * suppress this, if we can */
1403         if (m->dont_send && !serial)
1404                 return 1;
1405
1406         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1407                 size_t idx = 0;
1408
1409                 r = bus_write_message(bus, m, &idx);
1410                 if (r < 0)
1411                         return r;
1412                 else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1413                         /* Wasn't fully written. So let's remember how
1414                          * much was written. Note that the first entry
1415                          * of the wqueue array is always allocated so
1416                          * that we always can remember how much was
1417                          * written. */
1418                         bus->wqueue[0] = sd_bus_message_ref(m);
1419                         bus->wqueue_size = 1;
1420                         bus->windex = idx;
1421                 }
1422         } else {
1423                 sd_bus_message **q;
1424
1425                 /* Just append it to the queue. */
1426
1427                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1428                         return -ENOBUFS;
1429
1430                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1431                 if (!q)
1432                         return -ENOMEM;
1433
1434                 bus->wqueue = q;
1435                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1436         }
1437
1438         if (serial)
1439                 *serial = BUS_MESSAGE_SERIAL(m);
1440
1441         return 1;
1442 }
1443
1444 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *serial) {
1445         int r;
1446
1447         assert_return(bus, -EINVAL);
1448         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1449         assert_return(m, -EINVAL);
1450         assert_return(!bus_pid_changed(bus), -ECHILD);
1451
1452         if (!streq_ptr(m->destination, destination)) {
1453
1454                 if (!destination)
1455                         return -EEXIST;
1456
1457                 r = sd_bus_message_set_destination(m, destination);
1458                 if (r < 0)
1459                         return r;
1460         }
1461
1462         return sd_bus_send(bus, m, serial);
1463 }
1464
1465 static usec_t calc_elapse(uint64_t usec) {
1466         if (usec == (uint64_t) -1)
1467                 return 0;
1468
1469         if (usec == 0)
1470                 usec = BUS_DEFAULT_TIMEOUT;
1471
1472         return now(CLOCK_MONOTONIC) + usec;
1473 }
1474
1475 static int timeout_compare(const void *a, const void *b) {
1476         const struct reply_callback *x = a, *y = b;
1477
1478         if (x->timeout != 0 && y->timeout == 0)
1479                 return -1;
1480
1481         if (x->timeout == 0 && y->timeout != 0)
1482                 return 1;
1483
1484         if (x->timeout < y->timeout)
1485                 return -1;
1486
1487         if (x->timeout > y->timeout)
1488                 return 1;
1489
1490         return 0;
1491 }
1492
1493 _public_ int sd_bus_call_async(
1494                 sd_bus *bus,
1495                 sd_bus_message *m,
1496                 sd_bus_message_handler_t callback,
1497                 void *userdata,
1498                 uint64_t usec,
1499                 uint64_t *serial) {
1500
1501         struct reply_callback *c;
1502         int r;
1503
1504         assert_return(bus, -EINVAL);
1505         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1506         assert_return(m, -EINVAL);
1507         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1508         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1509         assert_return(callback, -EINVAL);
1510         assert_return(!bus_pid_changed(bus), -ECHILD);
1511
1512         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1513         if (r < 0)
1514                 return r;
1515
1516         if (usec != (uint64_t) -1) {
1517                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1518                 if (r < 0)
1519                         return r;
1520         }
1521
1522         r = bus_seal_message(bus, m);
1523         if (r < 0)
1524                 return r;
1525
1526         c = new0(struct reply_callback, 1);
1527         if (!c)
1528                 return -ENOMEM;
1529
1530         c->callback = callback;
1531         c->userdata = userdata;
1532         c->serial = BUS_MESSAGE_SERIAL(m);
1533         c->timeout = calc_elapse(usec);
1534
1535         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1536         if (r < 0) {
1537                 free(c);
1538                 return r;
1539         }
1540
1541         if (c->timeout != 0) {
1542                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1543                 if (r < 0) {
1544                         c->timeout = 0;
1545                         sd_bus_call_async_cancel(bus, c->serial);
1546                         return r;
1547                 }
1548         }
1549
1550         r = sd_bus_send(bus, m, serial);
1551         if (r < 0) {
1552                 sd_bus_call_async_cancel(bus, c->serial);
1553                 return r;
1554         }
1555
1556         return r;
1557 }
1558
1559 _public_ int sd_bus_call_async_cancel(sd_bus *bus, uint64_t serial) {
1560         struct reply_callback *c;
1561
1562         assert_return(bus, -EINVAL);
1563         assert_return(serial != 0, -EINVAL);
1564         assert_return(!bus_pid_changed(bus), -ECHILD);
1565
1566         c = hashmap_remove(bus->reply_callbacks, &serial);
1567         if (!c)
1568                 return 0;
1569
1570         if (c->timeout != 0)
1571                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1572
1573         free(c);
1574         return 1;
1575 }
1576
1577 int bus_ensure_running(sd_bus *bus) {
1578         int r;
1579
1580         assert(bus);
1581
1582         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1583                 return -ENOTCONN;
1584         if (bus->state == BUS_RUNNING)
1585                 return 1;
1586
1587         for (;;) {
1588                 r = sd_bus_process(bus, NULL);
1589                 if (r < 0)
1590                         return r;
1591                 if (bus->state == BUS_RUNNING)
1592                         return 1;
1593                 if (r > 0)
1594                         continue;
1595
1596                 r = sd_bus_wait(bus, (uint64_t) -1);
1597                 if (r < 0)
1598                         return r;
1599         }
1600 }
1601
1602 _public_ int sd_bus_call(
1603                 sd_bus *bus,
1604                 sd_bus_message *m,
1605                 uint64_t usec,
1606                 sd_bus_error *error,
1607                 sd_bus_message **reply) {
1608
1609         usec_t timeout;
1610         uint64_t serial;
1611         unsigned i;
1612         int r;
1613
1614         assert_return(bus, -EINVAL);
1615         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1616         assert_return(m, -EINVAL);
1617         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1618         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1619         assert_return(!bus_error_is_dirty(error), -EINVAL);
1620         assert_return(!bus_pid_changed(bus), -ECHILD);
1621
1622         r = bus_ensure_running(bus);
1623         if (r < 0)
1624                 return r;
1625
1626         r = sd_bus_send(bus, m, &serial);
1627         if (r < 0)
1628                 return r;
1629
1630         timeout = calc_elapse(usec);
1631         i = bus->rqueue_size;
1632
1633         for (;;) {
1634                 usec_t left;
1635
1636                 r = bus_read_message(bus);
1637                 if (r < 0)
1638                         return r;
1639
1640                 while (i < bus->rqueue_size) {
1641                         sd_bus_message *incoming = NULL;
1642
1643                         incoming = bus->rqueue[i];
1644
1645                         if (incoming->reply_serial == serial) {
1646                                 /* Found a match! */
1647
1648                                 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1649                                 bus->rqueue_size--;
1650
1651                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1652
1653                                         if (reply)
1654                                                 *reply = incoming;
1655                                         else
1656                                                 sd_bus_message_unref(incoming);
1657
1658                                         return 1;
1659                                 }
1660
1661                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1662                                         int k;
1663
1664                                         r = sd_bus_error_copy(error, &incoming->error);
1665                                         if (r < 0) {
1666                                                 sd_bus_message_unref(incoming);
1667                                                 return r;
1668                                         }
1669
1670                                         k = sd_bus_error_get_errno(&incoming->error);
1671                                         sd_bus_message_unref(incoming);
1672                                         return -k;
1673                                 }
1674
1675                                 sd_bus_message_unref(incoming);
1676                                 return -EIO;
1677
1678                         } else if (incoming->header->serial == serial &&
1679                                    bus->unique_name &&
1680                                    incoming->sender &&
1681                                    streq(bus->unique_name, incoming->sender)) {
1682
1683                                 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1684                                 bus->rqueue_size--;
1685
1686                                 /* Our own message? Somebody is trying
1687                                  * to send its own client a message,
1688                                  * let's not dead-lock, let's fail
1689                                  * immediately. */
1690
1691                                 sd_bus_message_unref(incoming);
1692                                 return -ELOOP;
1693                         }
1694
1695                         /* Try to read more, right-away */
1696                         i++;
1697                 }
1698
1699                 if (r > 0)
1700                         continue;
1701
1702                 if (timeout > 0) {
1703                         usec_t n;
1704
1705                         n = now(CLOCK_MONOTONIC);
1706                         if (n >= timeout)
1707                                 return -ETIMEDOUT;
1708
1709                         left = timeout - n;
1710                 } else
1711                         left = (uint64_t) -1;
1712
1713                 r = bus_poll(bus, true, left);
1714                 if (r < 0)
1715                         return r;
1716
1717                 r = dispatch_wqueue(bus);
1718                 if (r < 0)
1719                         return r;
1720         }
1721 }
1722
1723 _public_ int sd_bus_get_fd(sd_bus *bus) {
1724
1725         assert_return(bus, -EINVAL);
1726         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1727         assert_return(!bus_pid_changed(bus), -ECHILD);
1728
1729         return bus->input_fd;
1730 }
1731
1732 _public_ int sd_bus_get_events(sd_bus *bus) {
1733         int flags = 0;
1734
1735         assert_return(bus, -EINVAL);
1736         assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1737         assert_return(!bus_pid_changed(bus), -ECHILD);
1738
1739         if (bus->state == BUS_OPENING)
1740                 flags |= POLLOUT;
1741         else if (bus->state == BUS_AUTHENTICATING) {
1742
1743                 if (bus_socket_auth_needs_write(bus))
1744                         flags |= POLLOUT;
1745
1746                 flags |= POLLIN;
1747
1748         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1749                 if (bus->rqueue_size <= 0)
1750                         flags |= POLLIN;
1751                 if (bus->wqueue_size > 0)
1752                         flags |= POLLOUT;
1753         }
1754
1755         return flags;
1756 }
1757
1758 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1759         struct reply_callback *c;
1760
1761         assert_return(bus, -EINVAL);
1762         assert_return(timeout_usec, -EINVAL);
1763         assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
1764         assert_return(!bus_pid_changed(bus), -ECHILD);
1765
1766         if (bus->state == BUS_CLOSING) {
1767                 *timeout_usec = 0;
1768                 return 1;
1769         }
1770
1771         if (bus->state == BUS_AUTHENTICATING) {
1772                 *timeout_usec = bus->auth_timeout;
1773                 return 1;
1774         }
1775
1776         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1777                 *timeout_usec = (uint64_t) -1;
1778                 return 0;
1779         }
1780
1781         if (bus->rqueue_size > 0) {
1782                 *timeout_usec = 0;
1783                 return 1;
1784         }
1785
1786         c = prioq_peek(bus->reply_callbacks_prioq);
1787         if (!c) {
1788                 *timeout_usec = (uint64_t) -1;
1789                 return 0;
1790         }
1791
1792         *timeout_usec = c->timeout;
1793         return 1;
1794 }
1795
1796 static int process_timeout(sd_bus *bus) {
1797         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1798         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1799         struct reply_callback *c;
1800         usec_t n;
1801         int r;
1802
1803         assert(bus);
1804
1805         c = prioq_peek(bus->reply_callbacks_prioq);
1806         if (!c)
1807                 return 0;
1808
1809         n = now(CLOCK_MONOTONIC);
1810         if (c->timeout > n)
1811                 return 0;
1812
1813         r = bus_message_new_synthetic_error(
1814                         bus,
1815                         c->serial,
1816                         &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1817                         &m);
1818         if (r < 0)
1819                 return r;
1820
1821         m->sender = "org.freedesktop.DBus";
1822
1823         r = bus_seal_message(bus, m);
1824         if (r < 0)
1825                 return r;
1826
1827         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1828         hashmap_remove(bus->reply_callbacks, &c->serial);
1829
1830         bus->current = m;
1831         bus->iteration_counter ++;
1832
1833         r = c->callback(bus, m, c->userdata, &error_buffer);
1834         r = bus_maybe_reply_error(m, r, &error_buffer);
1835         free(c);
1836
1837         bus->current = NULL;
1838
1839         return r;
1840 }
1841
1842 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1843         assert(bus);
1844         assert(m);
1845
1846         if (bus->state != BUS_HELLO)
1847                 return 0;
1848
1849         /* Let's make sure the first message on the bus is the HELLO
1850          * reply. But note that we don't actually parse the message
1851          * here (we leave that to the usual handling), we just verify
1852          * we don't let any earlier msg through. */
1853
1854         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1855             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1856                 return -EIO;
1857
1858         if (m->reply_serial != bus->hello_serial)
1859                 return -EIO;
1860
1861         return 0;
1862 }
1863
1864 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1865         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1866         struct reply_callback *c;
1867         int r;
1868
1869         assert(bus);
1870         assert(m);
1871
1872         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1873             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1874                 return 0;
1875
1876         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1877         if (!c)
1878                 return 0;
1879
1880         if (c->timeout != 0)
1881                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1882
1883         r = sd_bus_message_rewind(m, true);
1884         if (r < 0)
1885                 return r;
1886
1887         r = c->callback(bus, m, c->userdata, &error_buffer);
1888         r = bus_maybe_reply_error(m, r, &error_buffer);
1889         free(c);
1890
1891         return r;
1892 }
1893
1894 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1895         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
1896         struct filter_callback *l;
1897         int r;
1898
1899         assert(bus);
1900         assert(m);
1901
1902         do {
1903                 bus->filter_callbacks_modified = false;
1904
1905                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1906
1907                         if (bus->filter_callbacks_modified)
1908                                 break;
1909
1910                         /* Don't run this more than once per iteration */
1911                         if (l->last_iteration == bus->iteration_counter)
1912                                 continue;
1913
1914                         l->last_iteration = bus->iteration_counter;
1915
1916                         r = sd_bus_message_rewind(m, true);
1917                         if (r < 0)
1918                                 return r;
1919
1920                         r = l->callback(bus, m, l->userdata, &error_buffer);
1921                         r = bus_maybe_reply_error(m, r, &error_buffer);
1922                         if (r != 0)
1923                                 return r;
1924
1925                 }
1926
1927         } while (bus->filter_callbacks_modified);
1928
1929         return 0;
1930 }
1931
1932 static int process_match(sd_bus *bus, sd_bus_message *m) {
1933         int r;
1934
1935         assert(bus);
1936         assert(m);
1937
1938         do {
1939                 bus->match_callbacks_modified = false;
1940
1941                 r = bus_match_run(bus, &bus->match_callbacks, m);
1942                 if (r != 0)
1943                         return r;
1944
1945         } while (bus->match_callbacks_modified);
1946
1947         return 0;
1948 }
1949
1950 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1951         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1952         int r;
1953
1954         assert(bus);
1955         assert(m);
1956
1957         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1958                 return 0;
1959
1960         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1961                 return 0;
1962
1963         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1964                 return 1;
1965
1966         if (streq_ptr(m->member, "Ping"))
1967                 r = sd_bus_message_new_method_return(m, &reply);
1968         else if (streq_ptr(m->member, "GetMachineId")) {
1969                 sd_id128_t id;
1970                 char sid[33];
1971
1972                 r = sd_id128_get_machine(&id);
1973                 if (r < 0)
1974                         return r;
1975
1976                 r = sd_bus_message_new_method_return(m, &reply);
1977                 if (r < 0)
1978                         return r;
1979
1980                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1981         } else {
1982                 r = sd_bus_message_new_method_errorf(
1983                                 m, &reply,
1984                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1985                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1986         }
1987
1988         if (r < 0)
1989                 return r;
1990
1991         r = sd_bus_send(bus, reply, NULL);
1992         if (r < 0)
1993                 return r;
1994
1995         return 1;
1996 }
1997
1998 static int process_message(sd_bus *bus, sd_bus_message *m) {
1999         int r;
2000
2001         assert(bus);
2002         assert(m);
2003
2004         bus->current = m;
2005         bus->iteration_counter++;
2006
2007         log_debug("Got message sender=%s object=%s interface=%s member=%s",
2008                   strna(sd_bus_message_get_sender(m)),
2009                   strna(sd_bus_message_get_path(m)),
2010                   strna(sd_bus_message_get_interface(m)),
2011                   strna(sd_bus_message_get_member(m)));
2012
2013         r = process_hello(bus, m);
2014         if (r != 0)
2015                 goto finish;
2016
2017         r = process_reply(bus, m);
2018         if (r != 0)
2019                 goto finish;
2020
2021         r = process_filter(bus, m);
2022         if (r != 0)
2023                 goto finish;
2024
2025         r = process_match(bus, m);
2026         if (r != 0)
2027                 goto finish;
2028
2029         r = process_builtin(bus, m);
2030         if (r != 0)
2031                 goto finish;
2032
2033         r = bus_process_object(bus, m);
2034
2035 finish:
2036         bus->current = NULL;
2037         return r;
2038 }
2039
2040 static int process_running(sd_bus *bus, sd_bus_message **ret) {
2041         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2042         int r;
2043
2044         assert(bus);
2045         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2046
2047         r = process_timeout(bus);
2048         if (r != 0)
2049                 goto null_message;
2050
2051         r = dispatch_wqueue(bus);
2052         if (r != 0)
2053                 goto null_message;
2054
2055         r = dispatch_rqueue(bus, &m);
2056         if (r < 0)
2057                 return r;
2058         if (!m)
2059                 goto null_message;
2060
2061         r = process_message(bus, m);
2062         if (r != 0)
2063                 goto null_message;
2064
2065         if (ret) {
2066                 r = sd_bus_message_rewind(m, true);
2067                 if (r < 0)
2068                         return r;
2069
2070                 *ret = m;
2071                 m = NULL;
2072                 return 1;
2073         }
2074
2075         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2076
2077                 r = sd_bus_reply_method_errorf(
2078                                 m,
2079                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2080                                 "Unknown object '%s'.", m->path);
2081                 if (r < 0)
2082                         return r;
2083         }
2084
2085         return 1;
2086
2087 null_message:
2088         if (r >= 0 && ret)
2089                 *ret = NULL;
2090
2091         return r;
2092 }
2093
2094 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2095         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2096         struct reply_callback *c;
2097         int r;
2098
2099         assert(bus);
2100         assert(bus->state == BUS_CLOSING);
2101
2102         c = hashmap_first(bus->reply_callbacks);
2103         if (c) {
2104                 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2105
2106                 /* First, fail all outstanding method calls */
2107                 r = bus_message_new_synthetic_error(
2108                                 bus,
2109                                 c->serial,
2110                                 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2111                                 &m);
2112                 if (r < 0)
2113                         return r;
2114
2115                 r = bus_seal_message(bus, m);
2116                 if (r < 0)
2117                         return r;
2118
2119                 if (c->timeout != 0)
2120                         prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2121
2122                 hashmap_remove(bus->reply_callbacks, &c->serial);
2123
2124                 bus->current = m;
2125                 bus->iteration_counter++;
2126
2127                 r = c->callback(bus, m, c->userdata, &error_buffer);
2128                 r = bus_maybe_reply_error(m, r, &error_buffer);
2129                 free(c);
2130
2131                 goto finish;
2132         }
2133
2134         /* Then, synthesize a Disconnected message */
2135         r = sd_bus_message_new_signal(
2136                         bus,
2137                         "/org/freedesktop/DBus/Local",
2138                         "org.freedesktop.DBus.Local",
2139                         "Disconnected",
2140                         &m);
2141         if (r < 0)
2142                 return r;
2143
2144         m->sender = "org.freedesktop.DBus.Local";
2145
2146         r = bus_seal_message(bus, m);
2147         if (r < 0)
2148                 return r;
2149
2150         sd_bus_close(bus);
2151
2152         bus->current = m;
2153         bus->iteration_counter++;
2154
2155         r = process_filter(bus, m);
2156         if (r != 0)
2157                 goto finish;
2158
2159         r = process_match(bus, m);
2160         if (r != 0)
2161                 goto finish;
2162
2163         if (ret) {
2164                 *ret = m;
2165                 m = NULL;
2166         }
2167
2168         r = 1;
2169
2170 finish:
2171         bus->current = NULL;
2172         return r;
2173 }
2174
2175 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2176         BUS_DONT_DESTROY(bus);
2177         int r;
2178
2179         /* Returns 0 when we didn't do anything. This should cause the
2180          * caller to invoke sd_bus_wait() before returning the next
2181          * time. Returns > 0 when we did something, which possibly
2182          * means *ret is filled in with an unprocessed message. */
2183
2184         assert_return(bus, -EINVAL);
2185         assert_return(!bus_pid_changed(bus), -ECHILD);
2186
2187         /* We don't allow recursively invoking sd_bus_process(). */
2188         assert_return(!bus->current, -EBUSY);
2189
2190         switch (bus->state) {
2191
2192         case BUS_UNSET:
2193         case BUS_CLOSED:
2194                 return -ENOTCONN;
2195
2196         case BUS_OPENING:
2197                 r = bus_socket_process_opening(bus);
2198                 if (r == -ECONNRESET || r == -EPIPE) {
2199                         bus_enter_closing(bus);
2200                         r = 1;
2201                 } else if (r < 0)
2202                         return r;
2203                 if (ret)
2204                         *ret = NULL;
2205                 return r;
2206
2207         case BUS_AUTHENTICATING:
2208                 r = bus_socket_process_authenticating(bus);
2209                 if (r == -ECONNRESET || r == -EPIPE) {
2210                         bus_enter_closing(bus);
2211                         r = 1;
2212                 } else if (r < 0)
2213                         return r;
2214
2215                 if (ret)
2216                         *ret = NULL;
2217
2218                 return r;
2219
2220         case BUS_RUNNING:
2221         case BUS_HELLO:
2222                 r = process_running(bus, ret);
2223                 if (r == -ECONNRESET || r == -EPIPE) {
2224                         bus_enter_closing(bus);
2225                         r = 1;
2226
2227                         if (ret)
2228                                 *ret = NULL;
2229                 }
2230
2231                 return r;
2232
2233         case BUS_CLOSING:
2234                 return process_closing(bus, ret);
2235         }
2236
2237         assert_not_reached("Unknown state");
2238 }
2239
2240 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2241         struct pollfd p[2] = {};
2242         int r, e, n;
2243         struct timespec ts;
2244         usec_t m = (usec_t) -1;
2245
2246         assert(bus);
2247
2248         if (bus->state == BUS_CLOSING)
2249                 return 1;
2250
2251         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2252
2253         e = sd_bus_get_events(bus);
2254         if (e < 0)
2255                 return e;
2256
2257         if (need_more)
2258                 /* The caller really needs some more data, he doesn't
2259                  * care about what's already read, or any timeouts
2260                  * except its own.*/
2261                 e |= POLLIN;
2262         else {
2263                 usec_t until;
2264                 /* The caller wants to process if there's something to
2265                  * process, but doesn't care otherwise */
2266
2267                 r = sd_bus_get_timeout(bus, &until);
2268                 if (r < 0)
2269                         return r;
2270                 if (r > 0) {
2271                         usec_t nw;
2272                         nw = now(CLOCK_MONOTONIC);
2273                         m = until > nw ? until - nw : 0;
2274                 }
2275         }
2276
2277         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2278                 m = timeout_usec;
2279
2280         p[0].fd = bus->input_fd;
2281         if (bus->output_fd == bus->input_fd) {
2282                 p[0].events = e;
2283                 n = 1;
2284         } else {
2285                 p[0].events = e & POLLIN;
2286                 p[1].fd = bus->output_fd;
2287                 p[1].events = e & POLLOUT;
2288                 n = 2;
2289         }
2290
2291         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2292         if (r < 0)
2293                 return -errno;
2294
2295         return r > 0 ? 1 : 0;
2296 }
2297
2298 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2299
2300         assert_return(bus, -EINVAL);
2301         assert_return(!bus_pid_changed(bus), -ECHILD);
2302
2303         if (bus->state == BUS_CLOSING)
2304                 return 0;
2305
2306         assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
2307
2308         if (bus->rqueue_size > 0)
2309                 return 0;
2310
2311         return bus_poll(bus, false, timeout_usec);
2312 }
2313
2314 _public_ int sd_bus_flush(sd_bus *bus) {
2315         int r;
2316
2317         assert_return(bus, -EINVAL);
2318         assert_return(!bus_pid_changed(bus), -ECHILD);
2319
2320         if (bus->state == BUS_CLOSING)
2321                 return 0;
2322
2323         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2324
2325         r = bus_ensure_running(bus);
2326         if (r < 0)
2327                 return r;
2328
2329         if (bus->wqueue_size <= 0)
2330                 return 0;
2331
2332         for (;;) {
2333                 r = dispatch_wqueue(bus);
2334                 if (r < 0)
2335                         return r;
2336
2337                 if (bus->wqueue_size <= 0)
2338                         return 0;
2339
2340                 r = bus_poll(bus, false, (uint64_t) -1);
2341                 if (r < 0)
2342                         return r;
2343         }
2344 }
2345
2346 _public_ int sd_bus_add_filter(sd_bus *bus,
2347                                sd_bus_message_handler_t callback,
2348                                void *userdata) {
2349
2350         struct filter_callback *f;
2351
2352         assert_return(bus, -EINVAL);
2353         assert_return(callback, -EINVAL);
2354         assert_return(!bus_pid_changed(bus), -ECHILD);
2355
2356         f = new0(struct filter_callback, 1);
2357         if (!f)
2358                 return -ENOMEM;
2359         f->callback = callback;
2360         f->userdata = userdata;
2361
2362         bus->filter_callbacks_modified = true;
2363         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2364         return 0;
2365 }
2366
2367 _public_ int sd_bus_remove_filter(sd_bus *bus,
2368                                   sd_bus_message_handler_t callback,
2369                                   void *userdata) {
2370
2371         struct filter_callback *f;
2372
2373         assert_return(bus, -EINVAL);
2374         assert_return(callback, -EINVAL);
2375         assert_return(!bus_pid_changed(bus), -ECHILD);
2376
2377         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2378                 if (f->callback == callback && f->userdata == userdata) {
2379                         bus->filter_callbacks_modified = true;
2380                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2381                         free(f);
2382                         return 1;
2383                 }
2384         }
2385
2386         return 0;
2387 }
2388
2389 _public_ int sd_bus_add_match(sd_bus *bus,
2390                               const char *match,
2391                               sd_bus_message_handler_t callback,
2392                               void *userdata) {
2393
2394         struct bus_match_component *components = NULL;
2395         unsigned n_components = 0;
2396         uint64_t cookie = 0;
2397         int r = 0;
2398
2399         assert_return(bus, -EINVAL);
2400         assert_return(match, -EINVAL);
2401         assert_return(!bus_pid_changed(bus), -ECHILD);
2402
2403         r = bus_match_parse(match, &components, &n_components);
2404         if (r < 0)
2405                 goto finish;
2406
2407         if (bus->bus_client) {
2408                 cookie = ++bus->match_cookie;
2409
2410                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2411                 if (r < 0)
2412                         goto finish;
2413         }
2414
2415         bus->match_callbacks_modified = true;
2416         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2417         if (r < 0) {
2418                 if (bus->bus_client)
2419                         bus_remove_match_internal(bus, match, cookie);
2420         }
2421
2422 finish:
2423         bus_match_parse_free(components, n_components);
2424         return r;
2425 }
2426
2427 _public_ int sd_bus_remove_match(sd_bus *bus,
2428                                  const char *match,
2429                                  sd_bus_message_handler_t callback,
2430                                  void *userdata) {
2431
2432         struct bus_match_component *components = NULL;
2433         unsigned n_components = 0;
2434         int r = 0, q = 0;
2435         uint64_t cookie = 0;
2436
2437         assert_return(bus, -EINVAL);
2438         assert_return(match, -EINVAL);
2439         assert_return(!bus_pid_changed(bus), -ECHILD);
2440
2441         r = bus_match_parse(match, &components, &n_components);
2442         if (r < 0)
2443                 return r;
2444
2445         bus->match_callbacks_modified = true;
2446         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2447
2448         if (bus->bus_client)
2449                 q = bus_remove_match_internal(bus, match, cookie);
2450
2451         bus_match_parse_free(components, n_components);
2452
2453         return r < 0 ? r : q;
2454 }
2455
2456 bool bus_pid_changed(sd_bus *bus) {
2457         assert(bus);
2458
2459         /* We don't support people creating a bus connection and
2460          * keeping it around over a fork(). Let's complain. */
2461
2462         return bus->original_pid != getpid();
2463 }
2464
2465 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2466         sd_bus *bus = userdata;
2467         int r;
2468
2469         assert(bus);
2470
2471         r = sd_bus_process(bus, NULL);
2472         if (r < 0)
2473                 return r;
2474
2475         return 1;
2476 }
2477
2478 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2479         sd_bus *bus = userdata;
2480         int r;
2481
2482         assert(bus);
2483
2484         r = sd_bus_process(bus, NULL);
2485         if (r < 0)
2486                 return r;
2487
2488         return 1;
2489 }
2490
2491 static int prepare_callback(sd_event_source *s, void *userdata) {
2492         sd_bus *bus = userdata;
2493         int r, e;
2494         usec_t until;
2495
2496         assert(s);
2497         assert(bus);
2498
2499         e = sd_bus_get_events(bus);
2500         if (e < 0)
2501                 return e;
2502
2503         if (bus->output_fd != bus->input_fd) {
2504
2505                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2506                 if (r < 0)
2507                         return r;
2508
2509                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2510                 if (r < 0)
2511                         return r;
2512         } else {
2513                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2514                 if (r < 0)
2515                         return r;
2516         }
2517
2518         r = sd_bus_get_timeout(bus, &until);
2519         if (r < 0)
2520                 return r;
2521         if (r > 0) {
2522                 int j;
2523
2524                 j = sd_event_source_set_time(bus->time_event_source, until);
2525                 if (j < 0)
2526                         return j;
2527         }
2528
2529         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2530         if (r < 0)
2531                 return r;
2532
2533         return 1;
2534 }
2535
2536 static int quit_callback(sd_event_source *event, void *userdata) {
2537         sd_bus *bus = userdata;
2538
2539         assert(event);
2540
2541         sd_bus_flush(bus);
2542
2543         return 1;
2544 }
2545
2546 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2547         int r;
2548
2549         assert_return(bus, -EINVAL);
2550         assert_return(!bus->event, -EBUSY);
2551
2552         assert(!bus->input_io_event_source);
2553         assert(!bus->output_io_event_source);
2554         assert(!bus->time_event_source);
2555
2556         if (event)
2557                 bus->event = sd_event_ref(event);
2558         else  {
2559                 r = sd_event_default(&bus->event);
2560                 if (r < 0)
2561                         return r;
2562         }
2563
2564         r = sd_event_add_io(bus->event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2565         if (r < 0)
2566                 goto fail;
2567
2568         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2569         if (r < 0)
2570                 goto fail;
2571
2572         if (bus->output_fd != bus->input_fd) {
2573                 r = sd_event_add_io(bus->event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2574                 if (r < 0)
2575                         goto fail;
2576
2577                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2578                 if (r < 0)
2579                         goto fail;
2580         }
2581
2582         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2583         if (r < 0)
2584                 goto fail;
2585
2586         r = sd_event_add_monotonic(bus->event, 0, 0, time_callback, bus, &bus->time_event_source);
2587         if (r < 0)
2588                 goto fail;
2589
2590         r = sd_event_source_set_priority(bus->time_event_source, priority);
2591         if (r < 0)
2592                 goto fail;
2593
2594         r = sd_event_add_quit(bus->event, quit_callback, bus, &bus->quit_event_source);
2595         if (r < 0)
2596                 goto fail;
2597
2598         return 0;
2599
2600 fail:
2601         sd_bus_detach_event(bus);
2602         return r;
2603 }
2604
2605 _public_ int sd_bus_detach_event(sd_bus *bus) {
2606         assert_return(bus, -EINVAL);
2607         assert_return(bus->event, -ENXIO);
2608
2609         if (bus->input_io_event_source) {
2610                 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
2611                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2612         }
2613
2614         if (bus->output_io_event_source) {
2615                 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
2616                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2617         }
2618
2619         if (bus->time_event_source) {
2620                 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
2621                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2622         }
2623
2624         if (bus->quit_event_source) {
2625                 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
2626                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2627         }
2628
2629         if (bus->event)
2630                 bus->event = sd_event_unref(bus->event);
2631
2632         return 0;
2633 }
2634
2635 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
2636         assert_return(bus, NULL);
2637
2638         return bus->event;
2639 }
2640
2641 _public_ sd_bus_message* sd_bus_get_current(sd_bus *bus) {
2642         assert_return(bus, NULL);
2643
2644         return bus->current;
2645 }
2646
2647 static int bus_default(int (*bus_open)(sd_bus **), sd_bus **default_bus, sd_bus **ret) {
2648         sd_bus *b = NULL;
2649         int r;
2650
2651         assert(bus_open);
2652         assert(default_bus);
2653
2654         if (!ret)
2655                 return !!*default_bus;
2656
2657         if (*default_bus) {
2658                 *ret = sd_bus_ref(*default_bus);
2659                 return 0;
2660         }
2661
2662         r = bus_open(&b);
2663         if (r < 0)
2664                 return r;
2665
2666         b->default_bus_ptr = default_bus;
2667         b->tid = gettid();
2668         *default_bus = b;
2669
2670         *ret = b;
2671         return 1;
2672 }
2673
2674 _public_ int sd_bus_default_system(sd_bus **ret) {
2675         static __thread sd_bus *default_system_bus = NULL;
2676
2677         return bus_default(sd_bus_open_system, &default_system_bus, ret);
2678 }
2679
2680 _public_ int sd_bus_default_user(sd_bus **ret) {
2681         static __thread sd_bus *default_user_bus = NULL;
2682
2683         return bus_default(sd_bus_open_user, &default_user_bus, ret);
2684 }
2685
2686 _public_ int sd_bus_get_tid(sd_bus *b, pid_t *tid) {
2687         assert_return(b, -EINVAL);
2688         assert_return(tid, -EINVAL);
2689         assert_return(!bus_pid_changed(b), -ECHILD);
2690
2691         if (b->tid != 0) {
2692                 *tid = b->tid;
2693                 return 0;
2694         }
2695
2696         if (b->event)
2697                 return sd_event_get_tid(b->event, tid);
2698
2699         return -ENXIO;
2700 }
2701
2702 _public_ char *sd_bus_label_escape(const char *s) {
2703         char *r, *t;
2704         const char *f;
2705
2706         assert_return(s, NULL);
2707
2708         /* Escapes all chars that D-Bus' object path cannot deal
2709          * with. Can be reversed with bus_path_unescape(). We special
2710          * case the empty string. */
2711
2712         if (*s == 0)
2713                 return strdup("_");
2714
2715         r = new(char, strlen(s)*3 + 1);
2716         if (!r)
2717                 return NULL;
2718
2719         for (f = s, t = r; *f; f++) {
2720
2721                 /* Escape everything that is not a-zA-Z0-9. We also
2722                  * escape 0-9 if it's the first character */
2723
2724                 if (!(*f >= 'A' && *f <= 'Z') &&
2725                     !(*f >= 'a' && *f <= 'z') &&
2726                     !(f > s && *f >= '0' && *f <= '9')) {
2727                         *(t++) = '_';
2728                         *(t++) = hexchar(*f >> 4);
2729                         *(t++) = hexchar(*f);
2730                 } else
2731                         *(t++) = *f;
2732         }
2733
2734         *t = 0;
2735
2736         return r;
2737 }
2738
2739 _public_ char *sd_bus_label_unescape(const char *f) {
2740         char *r, *t;
2741
2742         assert_return(f, NULL);
2743
2744         /* Special case for the empty string */
2745         if (streq(f, "_"))
2746                 return strdup("");
2747
2748         r = new(char, strlen(f) + 1);
2749         if (!r)
2750                 return NULL;
2751
2752         for (t = r; *f; f++) {
2753
2754                 if (*f == '_') {
2755                         int a, b;
2756
2757                         if ((a = unhexchar(f[1])) < 0 ||
2758                             (b = unhexchar(f[2])) < 0) {
2759                                 /* Invalid escape code, let's take it literal then */
2760                                 *(t++) = '_';
2761                         } else {
2762                                 *(t++) = (char) ((a << 4) | b);
2763                                 f += 2;
2764                         }
2765                 } else
2766                         *(t++) = *f;
2767         }
2768
2769         *t = 0;
2770
2771         return r;
2772 }
2773
2774 _public_ int sd_bus_get_peer_creds(sd_bus *bus, uint64_t mask, sd_bus_creds **ret) {
2775         sd_bus_creds *c;
2776         pid_t pid = 0;
2777         int r;
2778
2779         assert_return(bus, -EINVAL);
2780         assert_return(mask <= _SD_BUS_CREDS_MAX, -ENOTSUP);
2781         assert_return(ret, -EINVAL);
2782         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2783         assert_return(!bus_pid_changed(bus), -ECHILD);
2784         assert_return(!bus->is_kernel, -ENOTSUP);
2785
2786         if (!bus->ucred_valid && !isempty(bus->label))
2787                 return -ENODATA;
2788
2789         c = bus_creds_new();
2790         if (!c)
2791                 return -ENOMEM;
2792
2793         if (bus->ucred_valid) {
2794                 pid = c->pid = bus->ucred.pid;
2795                 c->uid = bus->ucred.uid;
2796                 c->gid = bus->ucred.gid;
2797
2798                 c->mask |= ((SD_BUS_CREDS_UID | SD_BUS_CREDS_PID | SD_BUS_CREDS_GID) & mask) & bus->creds_mask;
2799         }
2800
2801         if (!isempty(bus->label) && (mask & SD_BUS_CREDS_SELINUX_CONTEXT)) {
2802                 c->label = strdup(bus->label);
2803                 if (!c->label) {
2804                         sd_bus_creds_unref(c);
2805                         return -ENOMEM;
2806                 }
2807
2808                 c->mask |= SD_BUS_CREDS_SELINUX_CONTEXT | bus->creds_mask;
2809         }
2810
2811         r = bus_creds_add_more(c, mask, pid, 0);
2812         if (r < 0)
2813                 return r;
2814
2815         *ret = c;
2816         return 0;
2817 }