chiark / gitweb /
turn kdbus support into a runtime option
[elogind.git] / src / libelogind / sd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 #include <netdb.h>
26 #include <poll.h>
27 #include <sys/mman.h>
28 #include <pthread.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "missing.h"
34 #include "def.h"
35 #include "cgroup-util.h"
36 #include "bus-label.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-objects.h"
46 #include "bus-util.h"
47 #include "bus-container.h"
48 #include "bus-protocol.h"
49 #include "bus-track.h"
50 #include "bus-slot.h"
51
52 #define log_debug_bus_message(m)                                         \
53         do {                                                             \
54                 sd_bus_message *_mm = (m);                               \
55                 log_debug("Got message type=%s sender=%s destination=%s object=%s interface=%s member=%s cookie=%" PRIu64 " reply_cookie=%" PRIu64 " error=%s", \
56                           bus_message_type_to_string(_mm->header->type), \
57                           strna(sd_bus_message_get_sender(_mm)),         \
58                           strna(sd_bus_message_get_destination(_mm)),    \
59                           strna(sd_bus_message_get_path(_mm)),           \
60                           strna(sd_bus_message_get_interface(_mm)),      \
61                           strna(sd_bus_message_get_member(_mm)),         \
62                           BUS_MESSAGE_COOKIE(_mm),                       \
63                           _mm->reply_cookie,                             \
64                           strna(_mm->error.message));                    \
65         } while (false)
66
67 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
68 static int attach_io_events(sd_bus *b);
69 static void detach_io_events(sd_bus *b);
70
71 static void bus_close_fds(sd_bus *b) {
72         assert(b);
73
74         detach_io_events(b);
75
76         if (b->input_fd >= 0)
77                 safe_close(b->input_fd);
78
79         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
80                 safe_close(b->output_fd);
81
82         b->input_fd = b->output_fd = -1;
83 }
84
85 static void bus_reset_queues(sd_bus *b) {
86         assert(b);
87
88         while (b->rqueue_size > 0)
89                 sd_bus_message_unref(b->rqueue[--b->rqueue_size]);
90
91         free(b->rqueue);
92         b->rqueue = NULL;
93         b->rqueue_allocated = 0;
94
95         while (b->wqueue_size > 0)
96                 sd_bus_message_unref(b->wqueue[--b->wqueue_size]);
97
98         free(b->wqueue);
99         b->wqueue = NULL;
100         b->wqueue_allocated = 0;
101 }
102
103 static void bus_free(sd_bus *b) {
104         sd_bus_slot *s;
105
106         assert(b);
107         assert(!b->track_queue);
108
109         b->state = BUS_CLOSED;
110
111         sd_bus_detach_event(b);
112
113         while ((s = b->slots)) {
114                 /* At this point only floating slots can still be
115                  * around, because the non-floating ones keep a
116                  * reference to the bus, and we thus couldn't be
117                  * destructing right now... We forcibly disconnect the
118                  * slots here, so that they still can be referenced by
119                  * apps, but are dead. */
120
121                 assert(s->floating);
122                 bus_slot_disconnect(s);
123                 sd_bus_slot_unref(s);
124         }
125
126         if (b->default_bus_ptr)
127                 *b->default_bus_ptr = NULL;
128
129         bus_close_fds(b);
130
131         if (b->kdbus_buffer)
132                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
133
134         free(b->label);
135         free(b->rbuffer);
136         free(b->unique_name);
137         free(b->auth_buffer);
138         free(b->address);
139         free(b->kernel);
140         free(b->machine);
141         free(b->fake_label);
142         free(b->cgroup_root);
143         free(b->description);
144
145         free(b->exec_path);
146         strv_free(b->exec_argv);
147
148         close_many(b->fds, b->n_fds);
149         free(b->fds);
150
151         bus_reset_queues(b);
152
153         ordered_hashmap_free_free(b->reply_callbacks);
154         prioq_free(b->reply_callbacks_prioq);
155
156         assert(b->match_callbacks.type == BUS_MATCH_ROOT);
157         bus_match_free(&b->match_callbacks);
158
159         hashmap_free_free(b->vtable_methods);
160         hashmap_free_free(b->vtable_properties);
161
162         assert(hashmap_isempty(b->nodes));
163         hashmap_free(b->nodes);
164
165         bus_kernel_flush_memfd(b);
166
167         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
168
169         free(b);
170 }
171
172 _public_ int sd_bus_new(sd_bus **ret) {
173         sd_bus *r;
174
175         assert_return(ret, -EINVAL);
176
177         r = new0(sd_bus, 1);
178         if (!r)
179                 return -ENOMEM;
180
181         r->n_ref = REFCNT_INIT;
182         r->input_fd = r->output_fd = -1;
183         r->message_version = 1;
184         r->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
185         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
186         r->attach_flags |= KDBUS_ATTACH_NAMES;
187         r->original_pid = getpid();
188
189         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
190
191         /* We guarantee that wqueue always has space for at least one
192          * entry */
193         if (!GREEDY_REALLOC(r->wqueue, r->wqueue_allocated, 1)) {
194                 free(r);
195                 return -ENOMEM;
196         }
197
198         *ret = r;
199         return 0;
200 }
201
202 _public_ int sd_bus_set_address(sd_bus *bus, const char *address) {
203         char *a;
204
205         assert_return(bus, -EINVAL);
206         assert_return(bus->state == BUS_UNSET, -EPERM);
207         assert_return(address, -EINVAL);
208         assert_return(!bus_pid_changed(bus), -ECHILD);
209
210         a = strdup(address);
211         if (!a)
212                 return -ENOMEM;
213
214         free(bus->address);
215         bus->address = a;
216
217         return 0;
218 }
219
220 _public_ int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
221         assert_return(bus, -EINVAL);
222         assert_return(bus->state == BUS_UNSET, -EPERM);
223         assert_return(input_fd >= 0, -EINVAL);
224         assert_return(output_fd >= 0, -EINVAL);
225         assert_return(!bus_pid_changed(bus), -ECHILD);
226
227         bus->input_fd = input_fd;
228         bus->output_fd = output_fd;
229         return 0;
230 }
231
232 _public_ int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
233         char *p, **a;
234
235         assert_return(bus, -EINVAL);
236         assert_return(bus->state == BUS_UNSET, -EPERM);
237         assert_return(path, -EINVAL);
238         assert_return(!strv_isempty(argv), -EINVAL);
239         assert_return(!bus_pid_changed(bus), -ECHILD);
240
241         p = strdup(path);
242         if (!p)
243                 return -ENOMEM;
244
245         a = strv_copy(argv);
246         if (!a) {
247                 free(p);
248                 return -ENOMEM;
249         }
250
251         free(bus->exec_path);
252         strv_free(bus->exec_argv);
253
254         bus->exec_path = p;
255         bus->exec_argv = a;
256
257         return 0;
258 }
259
260 _public_ int sd_bus_set_bus_client(sd_bus *bus, int b) {
261         assert_return(bus, -EINVAL);
262         assert_return(bus->state == BUS_UNSET, -EPERM);
263         assert_return(!bus_pid_changed(bus), -ECHILD);
264
265         bus->bus_client = !!b;
266         return 0;
267 }
268
269 _public_ int sd_bus_set_monitor(sd_bus *bus, int b) {
270         assert_return(bus, -EINVAL);
271         assert_return(bus->state == BUS_UNSET, -EPERM);
272         assert_return(!bus_pid_changed(bus), -ECHILD);
273
274         SET_FLAG(bus->hello_flags, KDBUS_HELLO_MONITOR, b);
275         return 0;
276 }
277
278 _public_ int sd_bus_negotiate_fds(sd_bus *bus, int b) {
279         assert_return(bus, -EINVAL);
280         assert_return(bus->state == BUS_UNSET, -EPERM);
281         assert_return(!bus_pid_changed(bus), -ECHILD);
282
283         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
284         return 0;
285 }
286
287 _public_ int sd_bus_negotiate_timestamp(sd_bus *bus, int b) {
288         uint64_t new_flags;
289         assert_return(bus, -EINVAL);
290         assert_return(!IN_SET(bus->state, BUS_CLOSING, BUS_CLOSED), -EPERM);
291         assert_return(!bus_pid_changed(bus), -ECHILD);
292
293         new_flags = bus->attach_flags;
294         SET_FLAG(new_flags, KDBUS_ATTACH_TIMESTAMP, b);
295
296         if (bus->attach_flags == new_flags)
297                 return 0;
298
299         bus->attach_flags = new_flags;
300         if (bus->state != BUS_UNSET && bus->is_kernel)
301                 bus_kernel_realize_attach_flags(bus);
302
303         return 0;
304 }
305
306 _public_ int sd_bus_negotiate_creds(sd_bus *bus, int b, uint64_t mask) {
307         uint64_t new_flags;
308
309         assert_return(bus, -EINVAL);
310         assert_return(mask <= _SD_BUS_CREDS_ALL, -EINVAL);
311         assert_return(!IN_SET(bus->state, BUS_CLOSING, BUS_CLOSED), -EPERM);
312         assert_return(!bus_pid_changed(bus), -ECHILD);
313
314         if (b)
315                 bus->creds_mask |= mask;
316         else
317                 bus->creds_mask &= ~mask;
318
319         /* The well knowns we need unconditionally, so that matches can work */
320         bus->creds_mask |= SD_BUS_CREDS_WELL_KNOWN_NAMES|SD_BUS_CREDS_UNIQUE_NAME;
321
322         /* Make sure we don't lose the timestamp flag */
323         new_flags = (bus->attach_flags & KDBUS_ATTACH_TIMESTAMP) | attach_flags_to_kdbus(bus->creds_mask);
324         if (bus->attach_flags == new_flags)
325                 return 0;
326
327         bus->attach_flags = new_flags;
328         if (bus->state != BUS_UNSET && bus->is_kernel)
329                 bus_kernel_realize_attach_flags(bus);
330
331         return 0;
332 }
333
334 _public_ int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
335         assert_return(bus, -EINVAL);
336         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
337         assert_return(bus->state == BUS_UNSET, -EPERM);
338         assert_return(!bus_pid_changed(bus), -ECHILD);
339
340         bus->is_server = !!b;
341         bus->server_id = server_id;
342         return 0;
343 }
344
345 _public_ int sd_bus_set_anonymous(sd_bus *bus, int b) {
346         assert_return(bus, -EINVAL);
347         assert_return(bus->state == BUS_UNSET, -EPERM);
348         assert_return(!bus_pid_changed(bus), -ECHILD);
349
350         bus->anonymous_auth = !!b;
351         return 0;
352 }
353
354 _public_ int sd_bus_set_trusted(sd_bus *bus, int b) {
355         assert_return(bus, -EINVAL);
356         assert_return(bus->state == BUS_UNSET, -EPERM);
357         assert_return(!bus_pid_changed(bus), -ECHILD);
358
359         bus->trusted = !!b;
360         return 0;
361 }
362
363 _public_ int sd_bus_set_description(sd_bus *bus, const char *description) {
364         assert_return(bus, -EINVAL);
365         assert_return(bus->state == BUS_UNSET, -EPERM);
366         assert_return(!bus_pid_changed(bus), -ECHILD);
367
368         return free_and_strdup(&bus->description, description);
369 }
370
371 _public_ int sd_bus_set_allow_interactive_authorization(sd_bus *bus, int b) {
372         assert_return(bus, -EINVAL);
373         assert_return(!bus_pid_changed(bus), -ECHILD);
374
375         bus->allow_interactive_authorization = !!b;
376         return 0;
377 }
378
379 _public_ int sd_bus_get_allow_interactive_authorization(sd_bus *bus) {
380         assert_return(bus, -EINVAL);
381         assert_return(!bus_pid_changed(bus), -ECHILD);
382
383         return bus->allow_interactive_authorization;
384 }
385
386 static int hello_callback(sd_bus_message *reply, void *userdata, sd_bus_error *error) {
387         const char *s;
388         sd_bus *bus;
389         int r;
390
391         assert(reply);
392         bus = reply->bus;
393         assert(bus);
394         assert(bus->state == BUS_HELLO || bus->state == BUS_CLOSING);
395
396         r = sd_bus_message_get_errno(reply);
397         if (r > 0)
398                 return -r;
399
400         r = sd_bus_message_read(reply, "s", &s);
401         if (r < 0)
402                 return r;
403
404         if (!service_name_is_valid(s) || s[0] != ':')
405                 return -EBADMSG;
406
407         bus->unique_name = strdup(s);
408         if (!bus->unique_name)
409                 return -ENOMEM;
410
411         if (bus->state == BUS_HELLO)
412                 bus->state = BUS_RUNNING;
413
414         return 1;
415 }
416
417 static int bus_send_hello(sd_bus *bus) {
418         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
419         int r;
420
421         assert(bus);
422
423         if (!bus->bus_client || bus->is_kernel)
424                 return 0;
425
426         r = sd_bus_message_new_method_call(
427                         bus,
428                         &m,
429                         "org.freedesktop.DBus",
430                         "/org/freedesktop/DBus",
431                         "org.freedesktop.DBus",
432                         "Hello");
433         if (r < 0)
434                 return r;
435
436         return sd_bus_call_async(bus, NULL, m, hello_callback, NULL, 0);
437 }
438
439 int bus_start_running(sd_bus *bus) {
440         assert(bus);
441
442         if (bus->bus_client && !bus->is_kernel) {
443                 bus->state = BUS_HELLO;
444                 return 1;
445         }
446
447         bus->state = BUS_RUNNING;
448         return 1;
449 }
450
451 static int parse_address_key(const char **p, const char *key, char **value) {
452         size_t l, n = 0, allocated = 0;
453         const char *a;
454         char *r = NULL;
455
456         assert(p);
457         assert(*p);
458         assert(value);
459
460         if (key) {
461                 l = strlen(key);
462                 if (strncmp(*p, key, l) != 0)
463                         return 0;
464
465                 if ((*p)[l] != '=')
466                         return 0;
467
468                 if (*value)
469                         return -EINVAL;
470
471                 a = *p + l + 1;
472         } else
473                 a = *p;
474
475         while (*a != ';' && *a != ',' && *a != 0) {
476                 char c;
477
478                 if (*a == '%') {
479                         int x, y;
480
481                         x = unhexchar(a[1]);
482                         if (x < 0) {
483                                 free(r);
484                                 return x;
485                         }
486
487                         y = unhexchar(a[2]);
488                         if (y < 0) {
489                                 free(r);
490                                 return y;
491                         }
492
493                         c = (char) ((x << 4) | y);
494                         a += 3;
495                 } else {
496                         c = *a;
497                         a++;
498                 }
499
500                 if (!GREEDY_REALLOC(r, allocated, n + 2))
501                         return -ENOMEM;
502
503                 r[n++] = c;
504         }
505
506         if (!r) {
507                 r = strdup("");
508                 if (!r)
509                         return -ENOMEM;
510         } else
511                 r[n] = 0;
512
513         if (*a == ',')
514                 a++;
515
516         *p = a;
517
518         free(*value);
519         *value = r;
520
521         return 1;
522 }
523
524 static void skip_address_key(const char **p) {
525         assert(p);
526         assert(*p);
527
528         *p += strcspn(*p, ",");
529
530         if (**p == ',')
531                 (*p) ++;
532 }
533
534 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
535         _cleanup_free_ char *path = NULL, *abstract = NULL;
536         size_t l;
537         int r;
538
539         assert(b);
540         assert(p);
541         assert(*p);
542         assert(guid);
543
544         while (**p != 0 && **p != ';') {
545                 r = parse_address_key(p, "guid", guid);
546                 if (r < 0)
547                         return r;
548                 else if (r > 0)
549                         continue;
550
551                 r = parse_address_key(p, "path", &path);
552                 if (r < 0)
553                         return r;
554                 else if (r > 0)
555                         continue;
556
557                 r = parse_address_key(p, "abstract", &abstract);
558                 if (r < 0)
559                         return r;
560                 else if (r > 0)
561                         continue;
562
563                 skip_address_key(p);
564         }
565
566         if (!path && !abstract)
567                 return -EINVAL;
568
569         if (path && abstract)
570                 return -EINVAL;
571
572         if (path) {
573                 l = strlen(path);
574                 if (l > sizeof(b->sockaddr.un.sun_path))
575                         return -E2BIG;
576
577                 b->sockaddr.un.sun_family = AF_UNIX;
578                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
579                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
580         } else if (abstract) {
581                 l = strlen(abstract);
582                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
583                         return -E2BIG;
584
585                 b->sockaddr.un.sun_family = AF_UNIX;
586                 b->sockaddr.un.sun_path[0] = 0;
587                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
588                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
589         }
590
591         return 0;
592 }
593
594 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
595         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
596         int r;
597         struct addrinfo *result, hints = {
598                 .ai_socktype = SOCK_STREAM,
599                 .ai_flags = AI_ADDRCONFIG,
600         };
601
602         assert(b);
603         assert(p);
604         assert(*p);
605         assert(guid);
606
607         while (**p != 0 && **p != ';') {
608                 r = parse_address_key(p, "guid", guid);
609                 if (r < 0)
610                         return r;
611                 else if (r > 0)
612                         continue;
613
614                 r = parse_address_key(p, "host", &host);
615                 if (r < 0)
616                         return r;
617                 else if (r > 0)
618                         continue;
619
620                 r = parse_address_key(p, "port", &port);
621                 if (r < 0)
622                         return r;
623                 else if (r > 0)
624                         continue;
625
626                 r = parse_address_key(p, "family", &family);
627                 if (r < 0)
628                         return r;
629                 else if (r > 0)
630                         continue;
631
632                 skip_address_key(p);
633         }
634
635         if (!host || !port)
636                 return -EINVAL;
637
638         if (family) {
639                 if (streq(family, "ipv4"))
640                         hints.ai_family = AF_INET;
641                 else if (streq(family, "ipv6"))
642                         hints.ai_family = AF_INET6;
643                 else
644                         return -EINVAL;
645         }
646
647         r = getaddrinfo(host, port, &hints, &result);
648         if (r == EAI_SYSTEM)
649                 return -errno;
650         else if (r != 0)
651                 return -EADDRNOTAVAIL;
652
653         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
654         b->sockaddr_size = result->ai_addrlen;
655
656         freeaddrinfo(result);
657
658         return 0;
659 }
660
661 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
662         char *path = NULL;
663         unsigned n_argv = 0, j;
664         char **argv = NULL;
665         size_t allocated = 0;
666         int r;
667
668         assert(b);
669         assert(p);
670         assert(*p);
671         assert(guid);
672
673         while (**p != 0 && **p != ';') {
674                 r = parse_address_key(p, "guid", guid);
675                 if (r < 0)
676                         goto fail;
677                 else if (r > 0)
678                         continue;
679
680                 r = parse_address_key(p, "path", &path);
681                 if (r < 0)
682                         goto fail;
683                 else if (r > 0)
684                         continue;
685
686                 if (startswith(*p, "argv")) {
687                         unsigned ul;
688
689                         errno = 0;
690                         ul = strtoul(*p + 4, (char**) p, 10);
691                         if (errno > 0 || **p != '=' || ul > 256) {
692                                 r = -EINVAL;
693                                 goto fail;
694                         }
695
696                         (*p) ++;
697
698                         if (ul >= n_argv) {
699                                 if (!GREEDY_REALLOC0(argv, allocated, ul + 2)) {
700                                         r = -ENOMEM;
701                                         goto fail;
702                                 }
703
704                                 n_argv = ul + 1;
705                         }
706
707                         r = parse_address_key(p, NULL, argv + ul);
708                         if (r < 0)
709                                 goto fail;
710
711                         continue;
712                 }
713
714                 skip_address_key(p);
715         }
716
717         if (!path) {
718                 r = -EINVAL;
719                 goto fail;
720         }
721
722         /* Make sure there are no holes in the array, with the
723          * exception of argv[0] */
724         for (j = 1; j < n_argv; j++)
725                 if (!argv[j]) {
726                         r = -EINVAL;
727                         goto fail;
728                 }
729
730         if (argv && argv[0] == NULL) {
731                 argv[0] = strdup(path);
732                 if (!argv[0]) {
733                         r = -ENOMEM;
734                         goto fail;
735                 }
736         }
737
738         b->exec_path = path;
739         b->exec_argv = argv;
740         return 0;
741
742 fail:
743         for (j = 0; j < n_argv; j++)
744                 free(argv[j]);
745
746         free(argv);
747         free(path);
748         return r;
749 }
750
751 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
752         _cleanup_free_ char *path = NULL;
753         int r;
754
755         assert(b);
756         assert(p);
757         assert(*p);
758         assert(guid);
759
760         while (**p != 0 && **p != ';') {
761                 r = parse_address_key(p, "guid", guid);
762                 if (r < 0)
763                         return r;
764                 else if (r > 0)
765                         continue;
766
767                 r = parse_address_key(p, "path", &path);
768                 if (r < 0)
769                         return r;
770                 else if (r > 0)
771                         continue;
772
773                 skip_address_key(p);
774         }
775
776         if (!path)
777                 return -EINVAL;
778
779         free(b->kernel);
780         b->kernel = path;
781         path = NULL;
782
783         return 0;
784 }
785
786 static int parse_container_unix_address(sd_bus *b, const char **p, char **guid) {
787         _cleanup_free_ char *machine = NULL, *pid = NULL;
788         int r;
789
790         assert(b);
791         assert(p);
792         assert(*p);
793         assert(guid);
794
795         while (**p != 0 && **p != ';') {
796                 r = parse_address_key(p, "guid", guid);
797                 if (r < 0)
798                         return r;
799                 else if (r > 0)
800                         continue;
801
802                 r = parse_address_key(p, "machine", &machine);
803                 if (r < 0)
804                         return r;
805                 else if (r > 0)
806                         continue;
807
808                 r = parse_address_key(p, "pid", &pid);
809                 if (r < 0)
810                         return r;
811                 else if (r > 0)
812                         continue;
813
814                 skip_address_key(p);
815         }
816
817         if (!machine == !pid)
818                 return -EINVAL;
819
820         if (machine) {
821                 if (!machine_name_is_valid(machine))
822                         return -EINVAL;
823
824                 free(b->machine);
825                 b->machine = machine;
826                 machine = NULL;
827         } else {
828                 free(b->machine);
829                 b->machine = NULL;
830         }
831
832         if (pid) {
833                 r = parse_pid(pid, &b->nspid);
834                 if (r < 0)
835                         return r;
836         } else
837                 b->nspid = 0;
838
839         b->sockaddr.un.sun_family = AF_UNIX;
840         strncpy(b->sockaddr.un.sun_path, "/var/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
841         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + strlen("/var/run/dbus/system_bus_socket");
842
843         return 0;
844 }
845
846 static int parse_container_kernel_address(sd_bus *b, const char **p, char **guid) {
847         _cleanup_free_ char *machine = NULL, *pid = NULL;
848         int r;
849
850         assert(b);
851         assert(p);
852         assert(*p);
853         assert(guid);
854
855         while (**p != 0 && **p != ';') {
856                 r = parse_address_key(p, "guid", guid);
857                 if (r < 0)
858                         return r;
859                 else if (r > 0)
860                         continue;
861
862                 r = parse_address_key(p, "machine", &machine);
863                 if (r < 0)
864                         return r;
865                 else if (r > 0)
866                         continue;
867
868                 r = parse_address_key(p, "pid", &pid);
869                 if (r < 0)
870                         return r;
871                 else if (r > 0)
872                         continue;
873
874                 skip_address_key(p);
875         }
876
877         if (!machine == !pid)
878                 return -EINVAL;
879
880         if (machine) {
881                 if (!machine_name_is_valid(machine))
882                         return -EINVAL;
883
884                 free(b->machine);
885                 b->machine = machine;
886                 machine = NULL;
887         } else {
888                 free(b->machine);
889                 b->machine = NULL;
890         }
891
892         if (pid) {
893                 r = parse_pid(pid, &b->nspid);
894                 if (r < 0)
895                         return r;
896         } else
897                 b->nspid = 0;
898
899         free(b->kernel);
900         b->kernel = strdup("/sys/fs/kdbus/0-system/bus");
901         if (!b->kernel)
902                 return -ENOMEM;
903
904         return 0;
905 }
906
907 static void bus_reset_parsed_address(sd_bus *b) {
908         assert(b);
909
910         zero(b->sockaddr);
911         b->sockaddr_size = 0;
912         strv_free(b->exec_argv);
913         free(b->exec_path);
914         b->exec_path = NULL;
915         b->exec_argv = NULL;
916         b->server_id = SD_ID128_NULL;
917         free(b->kernel);
918         b->kernel = NULL;
919         free(b->machine);
920         b->machine = NULL;
921         b->nspid = 0;
922 }
923
924 static int bus_parse_next_address(sd_bus *b) {
925         _cleanup_free_ char *guid = NULL;
926         const char *a;
927         int r;
928
929         assert(b);
930
931         if (!b->address)
932                 return 0;
933         if (b->address[b->address_index] == 0)
934                 return 0;
935
936         bus_reset_parsed_address(b);
937
938         a = b->address + b->address_index;
939
940         while (*a != 0) {
941
942                 if (*a == ';') {
943                         a++;
944                         continue;
945                 }
946
947                 if (startswith(a, "unix:")) {
948                         a += 5;
949
950                         r = parse_unix_address(b, &a, &guid);
951                         if (r < 0)
952                                 return r;
953                         break;
954
955                 } else if (startswith(a, "tcp:")) {
956
957                         a += 4;
958                         r = parse_tcp_address(b, &a, &guid);
959                         if (r < 0)
960                                 return r;
961
962                         break;
963
964                 } else if (startswith(a, "unixexec:")) {
965
966                         a += 9;
967                         r = parse_exec_address(b, &a, &guid);
968                         if (r < 0)
969                                 return r;
970
971                         break;
972
973                 } else if (startswith(a, "kernel:")) {
974
975                         a += 7;
976                         r = parse_kernel_address(b, &a, &guid);
977                         if (r < 0)
978                                 return r;
979
980                         break;
981                 } else if (startswith(a, "x-machine-unix:")) {
982
983                         a += 15;
984                         r = parse_container_unix_address(b, &a, &guid);
985                         if (r < 0)
986                                 return r;
987
988                         break;
989                 } else if (startswith(a, "x-machine-kernel:")) {
990
991                         a += 17;
992                         r = parse_container_kernel_address(b, &a, &guid);
993                         if (r < 0)
994                                 return r;
995
996                         break;
997                 }
998
999                 a = strchr(a, ';');
1000                 if (!a)
1001                         return 0;
1002         }
1003
1004         if (guid) {
1005                 r = sd_id128_from_string(guid, &b->server_id);
1006                 if (r < 0)
1007                         return r;
1008         }
1009
1010         b->address_index = a - b->address;
1011         return 1;
1012 }
1013
1014 static int bus_start_address(sd_bus *b) {
1015         int r;
1016
1017         assert(b);
1018
1019         for (;;) {
1020                 bool skipped = false;
1021
1022                 bus_close_fds(b);
1023
1024                 if (b->exec_path)
1025                         r = bus_socket_exec(b);
1026                 else if ((b->nspid > 0 || b->machine) && b->kernel)
1027                         r = bus_container_connect_kernel(b);
1028                 else if ((b->nspid > 0 || b->machine) && b->sockaddr.sa.sa_family != AF_UNSPEC)
1029                         r = bus_container_connect_socket(b);
1030                 else if (b->kernel)
1031                         r = bus_kernel_connect(b);
1032                 else if (b->sockaddr.sa.sa_family != AF_UNSPEC)
1033                         r = bus_socket_connect(b);
1034                 else
1035                         skipped = true;
1036
1037                 if (!skipped) {
1038                         if (r >= 0) {
1039                                 r = attach_io_events(b);
1040                                 if (r >= 0)
1041                                         return r;
1042                         }
1043
1044                         b->last_connect_error = -r;
1045                 }
1046
1047                 r = bus_parse_next_address(b);
1048                 if (r < 0)
1049                         return r;
1050                 if (r == 0)
1051                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
1052         }
1053 }
1054
1055 int bus_next_address(sd_bus *b) {
1056         assert(b);
1057
1058         bus_reset_parsed_address(b);
1059         return bus_start_address(b);
1060 }
1061
1062 static int bus_start_fd(sd_bus *b) {
1063         struct stat st;
1064         int r;
1065
1066         assert(b);
1067         assert(b->input_fd >= 0);
1068         assert(b->output_fd >= 0);
1069
1070         r = fd_nonblock(b->input_fd, true);
1071         if (r < 0)
1072                 return r;
1073
1074         r = fd_cloexec(b->input_fd, true);
1075         if (r < 0)
1076                 return r;
1077
1078         if (b->input_fd != b->output_fd) {
1079                 r = fd_nonblock(b->output_fd, true);
1080                 if (r < 0)
1081                         return r;
1082
1083                 r = fd_cloexec(b->output_fd, true);
1084                 if (r < 0)
1085                         return r;
1086         }
1087
1088         if (fstat(b->input_fd, &st) < 0)
1089                 return -errno;
1090
1091         if (S_ISCHR(b->input_fd))
1092                 return bus_kernel_take_fd(b);
1093         else
1094                 return bus_socket_take_fd(b);
1095 }
1096
1097 _public_ int sd_bus_start(sd_bus *bus) {
1098         int r;
1099
1100         assert_return(bus, -EINVAL);
1101         assert_return(bus->state == BUS_UNSET, -EPERM);
1102         assert_return(!bus_pid_changed(bus), -ECHILD);
1103
1104         bus->state = BUS_OPENING;
1105
1106         if (bus->is_server && bus->bus_client)
1107                 return -EINVAL;
1108
1109         if (bus->input_fd >= 0)
1110                 r = bus_start_fd(bus);
1111         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel || bus->machine)
1112                 r = bus_start_address(bus);
1113         else
1114                 return -EINVAL;
1115
1116         if (r < 0) {
1117                 sd_bus_close(bus);
1118                 return r;
1119         }
1120
1121         return bus_send_hello(bus);
1122 }
1123
1124 _public_ int sd_bus_open(sd_bus **ret) {
1125         const char *e;
1126         sd_bus *b;
1127         int r;
1128
1129         assert_return(ret, -EINVAL);
1130
1131         /* Let's connect to the starter bus if it is set, and
1132          * otherwise to the bus that is appropropriate for the scope
1133          * we are running in */
1134
1135         e = secure_getenv("DBUS_STARTER_BUS_TYPE");
1136         if (e) {
1137                 if (streq(e, "system"))
1138                         return sd_bus_open_system(ret);
1139                 else if (STR_IN_SET(e, "session", "user"))
1140                         return sd_bus_open_user(ret);
1141         }
1142
1143         e = secure_getenv("DBUS_STARTER_ADDRESS");
1144         if (!e) {
1145                 return sd_bus_open_system(ret);
1146         }
1147
1148         r = sd_bus_new(&b);
1149         if (r < 0)
1150                 return r;
1151
1152         r = sd_bus_set_address(b, e);
1153         if (r < 0)
1154                 goto fail;
1155
1156         b->bus_client = true;
1157
1158         /* We don't know whether the bus is trusted or not, so better
1159          * be safe, and authenticate everything */
1160         b->trusted = false;
1161         b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1162         b->creds_mask |= SD_BUS_CREDS_UID | SD_BUS_CREDS_EUID | SD_BUS_CREDS_EFFECTIVE_CAPS;
1163
1164         r = sd_bus_start(b);
1165         if (r < 0)
1166                 goto fail;
1167
1168         *ret = b;
1169         return 0;
1170
1171 fail:
1172         bus_free(b);
1173         return r;
1174 }
1175
1176 int bus_set_address_system(sd_bus *b) {
1177         const char *e;
1178         assert(b);
1179
1180         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1181         if (e)
1182                 return sd_bus_set_address(b, e);
1183
1184         return sd_bus_set_address(b, DEFAULT_SYSTEM_BUS_ADDRESS);
1185 }
1186
1187 _public_ int sd_bus_open_system(sd_bus **ret) {
1188         sd_bus *b;
1189         int r;
1190
1191         assert_return(ret, -EINVAL);
1192
1193         r = sd_bus_new(&b);
1194         if (r < 0)
1195                 return r;
1196
1197         r = bus_set_address_system(b);
1198         if (r < 0)
1199                 goto fail;
1200
1201         b->bus_client = true;
1202         b->is_system = true;
1203
1204         /* Let's do per-method access control on the system bus. We
1205          * need the caller's UID and capability set for that. */
1206         b->trusted = false;
1207         b->attach_flags |= KDBUS_ATTACH_CAPS | KDBUS_ATTACH_CREDS;
1208         b->creds_mask |= SD_BUS_CREDS_UID | SD_BUS_CREDS_EUID | SD_BUS_CREDS_EFFECTIVE_CAPS;
1209
1210         r = sd_bus_start(b);
1211         if (r < 0)
1212                 goto fail;
1213
1214         *ret = b;
1215         return 0;
1216
1217 fail:
1218         bus_free(b);
1219         return r;
1220 }
1221
1222 int bus_set_address_user(sd_bus *b) {
1223         const char *e;
1224
1225         assert(b);
1226
1227         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1228         if (e)
1229                 return sd_bus_set_address(b, e);
1230
1231         e = secure_getenv("XDG_RUNTIME_DIR");
1232         if (e) {
1233                 _cleanup_free_ char *ee = NULL;
1234
1235                 ee = bus_address_escape(e);
1236                 if (!ee)
1237                         return -ENOMEM;
1238
1239                 (void) asprintf(&b->address, KERNEL_USER_BUS_ADDRESS_FMT ";" UNIX_USER_BUS_ADDRESS_FMT, getuid(), ee);
1240         } else
1241                 (void) asprintf(&b->address, KERNEL_USER_BUS_ADDRESS_FMT, getuid());
1242
1243         if (!b->address)
1244                 return -ENOMEM;
1245
1246         return 0;
1247 }
1248
1249 _public_ int sd_bus_open_user(sd_bus **ret) {
1250         sd_bus *b;
1251         int r;
1252
1253         assert_return(ret, -EINVAL);
1254
1255         r = sd_bus_new(&b);
1256         if (r < 0)
1257                 return r;
1258
1259         r = bus_set_address_user(b);
1260         if (r < 0)
1261                 return r;
1262
1263         b->bus_client = true;
1264         b->is_user = true;
1265
1266         /* We don't do any per-method access control on the user
1267          * bus. */
1268         b->trusted = true;
1269
1270         r = sd_bus_start(b);
1271         if (r < 0)
1272                 goto fail;
1273
1274         *ret = b;
1275         return 0;
1276
1277 fail:
1278         bus_free(b);
1279         return r;
1280 }
1281
1282 int bus_set_address_system_remote(sd_bus *b, const char *host) {
1283         _cleanup_free_ char *e = NULL;
1284         char *m = NULL, *c = NULL;
1285
1286         assert(b);
1287         assert(host);
1288
1289         /* Let's see if we shall enter some container */
1290         m = strchr(host, ':');
1291         if (m) {
1292                 m++;
1293
1294                 /* Let's make sure this is not a port of some kind,
1295                  * and is a valid machine name. */
1296                 if (!in_charset(m, "0123456789") && machine_name_is_valid(m)) {
1297                         char *t;
1298
1299                         /* Cut out the host part */
1300                         t = strndupa(host, m - host - 1);
1301                         e = bus_address_escape(t);
1302                         if (!e)
1303                                 return -ENOMEM;
1304
1305                         c = strjoina(",argv4=--machine=", m);
1306                 }
1307         }
1308
1309         if (!e) {
1310                 e = bus_address_escape(host);
1311                 if (!e)
1312                         return -ENOMEM;
1313         }
1314
1315         b->address = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", c, NULL);
1316         if (!b->address)
1317                 return -ENOMEM;
1318
1319         return 0;
1320  }
1321
1322 _public_ int sd_bus_open_system_remote(sd_bus **ret, const char *host) {
1323         sd_bus *bus;
1324         int r;
1325
1326         assert_return(host, -EINVAL);
1327         assert_return(ret, -EINVAL);
1328
1329         r = sd_bus_new(&bus);
1330         if (r < 0)
1331                 return r;
1332
1333         r = bus_set_address_system_remote(bus, host);
1334         if (r < 0)
1335                 goto fail;
1336
1337         bus->bus_client = true;
1338         bus->trusted = false;
1339         bus->is_system = true;
1340
1341         r = sd_bus_start(bus);
1342         if (r < 0)
1343                 goto fail;
1344
1345         *ret = bus;
1346         return 0;
1347
1348 fail:
1349         bus_free(bus);
1350         return r;
1351 }
1352
1353 int bus_set_address_system_machine(sd_bus *b, const char *machine) {
1354         _cleanup_free_ char *e = NULL;
1355
1356         assert(b);
1357         assert(machine);
1358
1359         e = bus_address_escape(machine);
1360         if (!e)
1361                 return -ENOMEM;
1362
1363         b->address = strjoin("x-machine-kernel:machine=", e, ";x-machine-unix:machine=", e, NULL);
1364         if (!b->address)
1365                 return -ENOMEM;
1366
1367         return 0;
1368 }
1369
1370 _public_ int sd_bus_open_system_machine(sd_bus **ret, const char *machine) {
1371         sd_bus *bus;
1372         int r;
1373
1374         assert_return(machine, -EINVAL);
1375         assert_return(ret, -EINVAL);
1376         assert_return(machine_name_is_valid(machine), -EINVAL);
1377
1378         r = sd_bus_new(&bus);
1379         if (r < 0)
1380                 return r;
1381
1382         r = bus_set_address_system_machine(bus, machine);
1383         if (r < 0)
1384                 goto fail;
1385
1386         bus->bus_client = true;
1387         bus->trusted = false;
1388         bus->is_system = true;
1389
1390         r = sd_bus_start(bus);
1391         if (r < 0)
1392                 goto fail;
1393
1394         *ret = bus;
1395         return 0;
1396
1397 fail:
1398         bus_free(bus);
1399         return r;
1400 }
1401
1402 _public_ void sd_bus_close(sd_bus *bus) {
1403
1404         if (!bus)
1405                 return;
1406         if (bus->state == BUS_CLOSED)
1407                 return;
1408         if (bus_pid_changed(bus))
1409                 return;
1410
1411         bus->state = BUS_CLOSED;
1412
1413         sd_bus_detach_event(bus);
1414
1415         /* Drop all queued messages so that they drop references to
1416          * the bus object and the bus may be freed */
1417         bus_reset_queues(bus);
1418
1419         if (!bus->is_kernel)
1420                 bus_close_fds(bus);
1421
1422         /* We'll leave the fd open in case this is a kernel bus, since
1423          * there might still be memblocks around that reference this
1424          * bus, and they might need to invoke the KDBUS_CMD_FREE
1425          * ioctl on the fd when they are freed. */
1426 }
1427
1428 static void bus_enter_closing(sd_bus *bus) {
1429         assert(bus);
1430
1431         if (bus->state != BUS_OPENING &&
1432             bus->state != BUS_AUTHENTICATING &&
1433             bus->state != BUS_HELLO &&
1434             bus->state != BUS_RUNNING)
1435                 return;
1436
1437         bus->state = BUS_CLOSING;
1438 }
1439
1440 _public_ sd_bus *sd_bus_ref(sd_bus *bus) {
1441         assert_return(bus, NULL);
1442
1443         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1444
1445         return bus;
1446 }
1447
1448 _public_ sd_bus *sd_bus_unref(sd_bus *bus) {
1449         unsigned i;
1450
1451         if (!bus)
1452                 return NULL;
1453
1454         i = REFCNT_DEC(bus->n_ref);
1455         if (i > 0)
1456                 return NULL;
1457
1458         bus_free(bus);
1459         return NULL;
1460 }
1461
1462 _public_ int sd_bus_is_open(sd_bus *bus) {
1463
1464         assert_return(bus, -EINVAL);
1465         assert_return(!bus_pid_changed(bus), -ECHILD);
1466
1467         return BUS_IS_OPEN(bus->state);
1468 }
1469
1470 _public_ int sd_bus_can_send(sd_bus *bus, char type) {
1471         int r;
1472
1473         assert_return(bus, -EINVAL);
1474         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1475         assert_return(!bus_pid_changed(bus), -ECHILD);
1476
1477         if (bus->hello_flags & KDBUS_HELLO_MONITOR)
1478                 return 0;
1479
1480         if (type == SD_BUS_TYPE_UNIX_FD) {
1481                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1482                         return 0;
1483
1484                 r = bus_ensure_running(bus);
1485                 if (r < 0)
1486                         return r;
1487
1488                 return bus->can_fds;
1489         }
1490
1491         return bus_type_is_valid(type);
1492 }
1493
1494 _public_ int sd_bus_get_bus_id(sd_bus *bus, sd_id128_t *id) {
1495         int r;
1496
1497         assert_return(bus, -EINVAL);
1498         assert_return(id, -EINVAL);
1499         assert_return(!bus_pid_changed(bus), -ECHILD);
1500
1501         r = bus_ensure_running(bus);
1502         if (r < 0)
1503                 return r;
1504
1505         *id = bus->server_id;
1506         return 0;
1507 }
1508
1509 static int bus_seal_message(sd_bus *b, sd_bus_message *m, usec_t timeout) {
1510         assert(b);
1511         assert(m);
1512
1513         if (m->sealed) {
1514                 /* If we copy the same message to multiple
1515                  * destinations, avoid using the same cookie
1516                  * numbers. */
1517                 b->cookie = MAX(b->cookie, BUS_MESSAGE_COOKIE(m));
1518                 return 0;
1519         }
1520
1521         if (timeout == 0)
1522                 timeout = BUS_DEFAULT_TIMEOUT;
1523
1524         return bus_message_seal(m, ++b->cookie, timeout);
1525 }
1526
1527 static int bus_remarshal_message(sd_bus *b, sd_bus_message **m) {
1528         bool remarshal = false;
1529
1530         assert(b);
1531
1532         /* wrong packet version */
1533         if (b->message_version != 0 && b->message_version != (*m)->header->version)
1534                 remarshal = true;
1535
1536         /* wrong packet endianness */
1537         if (b->message_endian != 0 && b->message_endian != (*m)->header->endian)
1538                 remarshal = true;
1539
1540         /* TODO: kdbus-messages received from the kernel contain data which is
1541          * not allowed to be passed to KDBUS_CMD_SEND. Therefore, we have to
1542          * force remarshaling of the message. Technically, we could just
1543          * recreate the kdbus message, but that is non-trivial as other parts of
1544          * the message refer to m->kdbus already. This should be fixed! */
1545         if ((*m)->kdbus && (*m)->release_kdbus)
1546                 remarshal = true;
1547
1548         return remarshal ? bus_message_remarshal(b, m) : 0;
1549 }
1550
1551 int bus_seal_synthetic_message(sd_bus *b, sd_bus_message *m) {
1552         assert(b);
1553         assert(m);
1554
1555         /* Fake some timestamps, if they were requested, and not
1556          * already initialized */
1557         if (b->attach_flags & KDBUS_ATTACH_TIMESTAMP) {
1558                 if (m->realtime <= 0)
1559                         m->realtime = now(CLOCK_REALTIME);
1560
1561                 if (m->monotonic <= 0)
1562                         m->monotonic = now(CLOCK_MONOTONIC);
1563         }
1564
1565         /* The bus specification says the serial number cannot be 0,
1566          * hence let's fill something in for synthetic messages. Since
1567          * synthetic messages might have a fake sender and we don't
1568          * want to interfere with the real sender's serial numbers we
1569          * pick a fixed, artificial one. We use (uint32_t) -1 rather
1570          * than (uint64_t) -1 since dbus1 only had 32bit identifiers,
1571          * even though kdbus can do 64bit. */
1572         return bus_message_seal(m, 0xFFFFFFFFULL, 0);
1573 }
1574
1575 static int bus_write_message(sd_bus *bus, sd_bus_message *m, bool hint_sync_call, size_t *idx) {
1576         int r;
1577
1578         assert(bus);
1579         assert(m);
1580
1581         if (bus->is_kernel)
1582                 r = bus_kernel_write_message(bus, m, hint_sync_call);
1583         else
1584                 r = bus_socket_write_message(bus, m, idx);
1585
1586         if (r <= 0)
1587                 return r;
1588
1589         if (bus->is_kernel || *idx >= BUS_MESSAGE_SIZE(m))
1590                 log_debug("Sent message type=%s sender=%s destination=%s object=%s interface=%s member=%s cookie=%" PRIu64 " reply_cookie=%" PRIu64 " error=%s",
1591                           bus_message_type_to_string(m->header->type),
1592                           strna(sd_bus_message_get_sender(m)),
1593                           strna(sd_bus_message_get_destination(m)),
1594                           strna(sd_bus_message_get_path(m)),
1595                           strna(sd_bus_message_get_interface(m)),
1596                           strna(sd_bus_message_get_member(m)),
1597                           BUS_MESSAGE_COOKIE(m),
1598                           m->reply_cookie,
1599                           strna(m->error.message));
1600
1601         return r;
1602 }
1603
1604 static int dispatch_wqueue(sd_bus *bus) {
1605         int r, ret = 0;
1606
1607         assert(bus);
1608         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1609
1610         while (bus->wqueue_size > 0) {
1611
1612                 r = bus_write_message(bus, bus->wqueue[0], false, &bus->windex);
1613                 if (r < 0)
1614                         return r;
1615                 else if (r == 0)
1616                         /* Didn't do anything this time */
1617                         return ret;
1618                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1619                         /* Fully written. Let's drop the entry from
1620                          * the queue.
1621                          *
1622                          * This isn't particularly optimized, but
1623                          * well, this is supposed to be our worst-case
1624                          * buffer only, and the socket buffer is
1625                          * supposed to be our primary buffer, and if
1626                          * it got full, then all bets are off
1627                          * anyway. */
1628
1629                         bus->wqueue_size --;
1630                         sd_bus_message_unref(bus->wqueue[0]);
1631                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1632                         bus->windex = 0;
1633
1634                         ret = 1;
1635                 }
1636         }
1637
1638         return ret;
1639 }
1640
1641 static int bus_read_message(sd_bus *bus, bool hint_priority, int64_t priority) {
1642         assert(bus);
1643
1644         if (bus->is_kernel)
1645                 return bus_kernel_read_message(bus, hint_priority, priority);
1646         else
1647                 return bus_socket_read_message(bus);
1648 }
1649
1650 int bus_rqueue_make_room(sd_bus *bus) {
1651         assert(bus);
1652
1653         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1654                 return -ENOBUFS;
1655
1656         if (!GREEDY_REALLOC(bus->rqueue, bus->rqueue_allocated, bus->rqueue_size + 1))
1657                 return -ENOMEM;
1658
1659         return 0;
1660 }
1661
1662 static int dispatch_rqueue(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **m) {
1663         int r, ret = 0;
1664
1665         assert(bus);
1666         assert(m);
1667         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1668
1669         /* Note that the priority logic is only available on kdbus,
1670          * where the rqueue is unused. We check the rqueue here
1671          * anyway, because it's simple... */
1672
1673         for (;;) {
1674                 if (bus->rqueue_size > 0) {
1675                         /* Dispatch a queued message */
1676
1677                         *m = bus->rqueue[0];
1678                         bus->rqueue_size --;
1679                         memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1680                         return 1;
1681                 }
1682
1683                 /* Try to read a new message */
1684                 r = bus_read_message(bus, hint_priority, priority);
1685                 if (r < 0)
1686                         return r;
1687                 if (r == 0)
1688                         return ret;
1689
1690                 ret = 1;
1691         }
1692 }
1693
1694 static int bus_send_internal(sd_bus *bus, sd_bus_message *_m, uint64_t *cookie, bool hint_sync_call) {
1695         _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1696         int r;
1697
1698         assert_return(m, -EINVAL);
1699
1700         if (!bus)
1701                 bus = m->bus;
1702
1703         assert_return(!bus_pid_changed(bus), -ECHILD);
1704         assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1705
1706         if (!BUS_IS_OPEN(bus->state))
1707                 return -ENOTCONN;
1708
1709         if (m->n_fds > 0) {
1710                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1711                 if (r < 0)
1712                         return r;
1713                 if (r == 0)
1714                         return -EOPNOTSUPP;
1715         }
1716
1717         /* If the cookie number isn't kept, then we know that no reply
1718          * is expected */
1719         if (!cookie && !m->sealed)
1720                 m->header->flags |= BUS_MESSAGE_NO_REPLY_EXPECTED;
1721
1722         r = bus_seal_message(bus, m, 0);
1723         if (r < 0)
1724                 return r;
1725
1726         /* Remarshall if we have to. This will possibly unref the
1727          * message and place a replacement in m */
1728         r = bus_remarshal_message(bus, &m);
1729         if (r < 0)
1730                 return r;
1731
1732         /* If this is a reply and no reply was requested, then let's
1733          * suppress this, if we can */
1734         if (m->dont_send)
1735                 goto finish;
1736
1737         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1738                 size_t idx = 0;
1739
1740                 r = bus_write_message(bus, m, hint_sync_call, &idx);
1741                 if (r < 0) {
1742                         if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
1743                                 bus_enter_closing(bus);
1744                                 return -ECONNRESET;
1745                         }
1746
1747                         return r;
1748                 }
1749
1750                 if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1751                         /* Wasn't fully written. So let's remember how
1752                          * much was written. Note that the first entry
1753                          * of the wqueue array is always allocated so
1754                          * that we always can remember how much was
1755                          * written. */
1756                         bus->wqueue[0] = sd_bus_message_ref(m);
1757                         bus->wqueue_size = 1;
1758                         bus->windex = idx;
1759                 }
1760
1761         } else {
1762                 /* Just append it to the queue. */
1763
1764                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1765                         return -ENOBUFS;
1766
1767                 if (!GREEDY_REALLOC(bus->wqueue, bus->wqueue_allocated, bus->wqueue_size + 1))
1768                         return -ENOMEM;
1769
1770                 bus->wqueue[bus->wqueue_size ++] = sd_bus_message_ref(m);
1771         }
1772
1773 finish:
1774         if (cookie)
1775                 *cookie = BUS_MESSAGE_COOKIE(m);
1776
1777         return 1;
1778 }
1779
1780 _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *cookie) {
1781         return bus_send_internal(bus, m, cookie, false);
1782 }
1783
1784 _public_ int sd_bus_send_to(sd_bus *bus, sd_bus_message *m, const char *destination, uint64_t *cookie) {
1785         int r;
1786
1787         assert_return(m, -EINVAL);
1788
1789         if (!bus)
1790                 bus = m->bus;
1791
1792         assert_return(!bus_pid_changed(bus), -ECHILD);
1793
1794         if (!BUS_IS_OPEN(bus->state))
1795                 return -ENOTCONN;
1796
1797         if (!streq_ptr(m->destination, destination)) {
1798
1799                 if (!destination)
1800                         return -EEXIST;
1801
1802                 r = sd_bus_message_set_destination(m, destination);
1803                 if (r < 0)
1804                         return r;
1805         }
1806
1807         return sd_bus_send(bus, m, cookie);
1808 }
1809
1810 static usec_t calc_elapse(uint64_t usec) {
1811         if (usec == (uint64_t) -1)
1812                 return 0;
1813
1814         return now(CLOCK_MONOTONIC) + usec;
1815 }
1816
1817 static int timeout_compare(const void *a, const void *b) {
1818         const struct reply_callback *x = a, *y = b;
1819
1820         if (x->timeout != 0 && y->timeout == 0)
1821                 return -1;
1822
1823         if (x->timeout == 0 && y->timeout != 0)
1824                 return 1;
1825
1826         if (x->timeout < y->timeout)
1827                 return -1;
1828
1829         if (x->timeout > y->timeout)
1830                 return 1;
1831
1832         return 0;
1833 }
1834
1835 _public_ int sd_bus_call_async(
1836                 sd_bus *bus,
1837                 sd_bus_slot **slot,
1838                 sd_bus_message *_m,
1839                 sd_bus_message_handler_t callback,
1840                 void *userdata,
1841                 uint64_t usec) {
1842
1843         _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1844         _cleanup_bus_slot_unref_ sd_bus_slot *s = NULL;
1845         int r;
1846
1847         assert_return(m, -EINVAL);
1848         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1849         assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1850         assert_return(callback, -EINVAL);
1851
1852         if (!bus)
1853                 bus = m->bus;
1854
1855         assert_return(!bus_pid_changed(bus), -ECHILD);
1856         assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1857
1858         if (!BUS_IS_OPEN(bus->state))
1859                 return -ENOTCONN;
1860
1861         r = ordered_hashmap_ensure_allocated(&bus->reply_callbacks, &uint64_hash_ops);
1862         if (r < 0)
1863                 return r;
1864
1865         r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1866         if (r < 0)
1867                 return r;
1868
1869         r = bus_seal_message(bus, m, usec);
1870         if (r < 0)
1871                 return r;
1872
1873         r = bus_remarshal_message(bus, &m);
1874         if (r < 0)
1875                 return r;
1876
1877         s = bus_slot_allocate(bus, !slot, BUS_REPLY_CALLBACK, sizeof(struct reply_callback), userdata);
1878         if (!s)
1879                 return -ENOMEM;
1880
1881         s->reply_callback.callback = callback;
1882
1883         s->reply_callback.cookie = BUS_MESSAGE_COOKIE(m);
1884         r = ordered_hashmap_put(bus->reply_callbacks, &s->reply_callback.cookie, &s->reply_callback);
1885         if (r < 0) {
1886                 s->reply_callback.cookie = 0;
1887                 return r;
1888         }
1889
1890         s->reply_callback.timeout = calc_elapse(m->timeout);
1891         if (s->reply_callback.timeout != 0) {
1892                 r = prioq_put(bus->reply_callbacks_prioq, &s->reply_callback, &s->reply_callback.prioq_idx);
1893                 if (r < 0) {
1894                         s->reply_callback.timeout = 0;
1895                         return r;
1896                 }
1897         }
1898
1899         r = sd_bus_send(bus, m, &s->reply_callback.cookie);
1900         if (r < 0)
1901                 return r;
1902
1903         if (slot)
1904                 *slot = s;
1905         s = NULL;
1906
1907         return r;
1908 }
1909
1910 int bus_ensure_running(sd_bus *bus) {
1911         int r;
1912
1913         assert(bus);
1914
1915         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
1916                 return -ENOTCONN;
1917         if (bus->state == BUS_RUNNING)
1918                 return 1;
1919
1920         for (;;) {
1921                 r = sd_bus_process(bus, NULL);
1922                 if (r < 0)
1923                         return r;
1924                 if (bus->state == BUS_RUNNING)
1925                         return 1;
1926                 if (r > 0)
1927                         continue;
1928
1929                 r = sd_bus_wait(bus, (uint64_t) -1);
1930                 if (r < 0)
1931                         return r;
1932         }
1933 }
1934
1935 _public_ int sd_bus_call(
1936                 sd_bus *bus,
1937                 sd_bus_message *_m,
1938                 uint64_t usec,
1939                 sd_bus_error *error,
1940                 sd_bus_message **reply) {
1941
1942         _cleanup_bus_message_unref_ sd_bus_message *m = sd_bus_message_ref(_m);
1943         usec_t timeout;
1944         uint64_t cookie;
1945         unsigned i;
1946         int r;
1947
1948         assert_return(m, -EINVAL);
1949         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1950         assert_return(!(m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1951         assert_return(!bus_error_is_dirty(error), -EINVAL);
1952
1953         if (!bus)
1954                 bus = m->bus;
1955
1956         assert_return(!bus_pid_changed(bus), -ECHILD);
1957         assert_return(!bus->is_kernel || !(bus->hello_flags & KDBUS_HELLO_MONITOR), -EROFS);
1958
1959         if (!BUS_IS_OPEN(bus->state))
1960                 return -ENOTCONN;
1961
1962         r = bus_ensure_running(bus);
1963         if (r < 0)
1964                 return r;
1965
1966         i = bus->rqueue_size;
1967
1968         r = bus_seal_message(bus, m, usec);
1969         if (r < 0)
1970                 return r;
1971
1972         r = bus_remarshal_message(bus, &m);
1973         if (r < 0)
1974                 return r;
1975
1976         r = bus_send_internal(bus, m, &cookie, true);
1977         if (r < 0)
1978                 return r;
1979
1980         timeout = calc_elapse(m->timeout);
1981
1982         for (;;) {
1983                 usec_t left;
1984
1985                 while (i < bus->rqueue_size) {
1986                         sd_bus_message *incoming = NULL;
1987
1988                         incoming = bus->rqueue[i];
1989
1990                         if (incoming->reply_cookie == cookie) {
1991                                 /* Found a match! */
1992
1993                                 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
1994                                 bus->rqueue_size--;
1995                                 log_debug_bus_message(incoming);
1996
1997                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1998
1999                                         if (incoming->n_fds <= 0 || (bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)) {
2000                                                 if (reply)
2001                                                         *reply = incoming;
2002                                                 else
2003                                                         sd_bus_message_unref(incoming);
2004
2005                                                 return 1;
2006                                         }
2007
2008                                         r = sd_bus_error_setf(error, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptors which I couldn't accept. Sorry.");
2009
2010                                 } else if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR)
2011                                         r = sd_bus_error_copy(error, &incoming->error);
2012                                 else
2013                                         r = -EIO;
2014
2015                                 sd_bus_message_unref(incoming);
2016                                 return r;
2017
2018                         } else if (BUS_MESSAGE_COOKIE(incoming) == cookie &&
2019                                    bus->unique_name &&
2020                                    incoming->sender &&
2021                                    streq(bus->unique_name, incoming->sender)) {
2022
2023                                 memmove(bus->rqueue + i, bus->rqueue + i + 1, sizeof(sd_bus_message*) * (bus->rqueue_size - i - 1));
2024                                 bus->rqueue_size--;
2025
2026                                 /* Our own message? Somebody is trying
2027                                  * to send its own client a message,
2028                                  * let's not dead-lock, let's fail
2029                                  * immediately. */
2030
2031                                 sd_bus_message_unref(incoming);
2032                                 return -ELOOP;
2033                         }
2034
2035                         /* Try to read more, right-away */
2036                         i++;
2037                 }
2038
2039                 r = bus_read_message(bus, false, 0);
2040                 if (r < 0) {
2041                         if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2042                                 bus_enter_closing(bus);
2043                                 return -ECONNRESET;
2044                         }
2045
2046                         return r;
2047                 }
2048                 if (r > 0)
2049                         continue;
2050
2051                 if (timeout > 0) {
2052                         usec_t n;
2053
2054                         n = now(CLOCK_MONOTONIC);
2055                         if (n >= timeout)
2056                                 return -ETIMEDOUT;
2057
2058                         left = timeout - n;
2059                 } else
2060                         left = (uint64_t) -1;
2061
2062                 r = bus_poll(bus, true, left);
2063                 if (r < 0)
2064                         return r;
2065                 if (r == 0)
2066                         return -ETIMEDOUT;
2067
2068                 r = dispatch_wqueue(bus);
2069                 if (r < 0) {
2070                         if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2071                                 bus_enter_closing(bus);
2072                                 return -ECONNRESET;
2073                         }
2074
2075                         return r;
2076                 }
2077         }
2078 }
2079
2080 _public_ int sd_bus_get_fd(sd_bus *bus) {
2081
2082         assert_return(bus, -EINVAL);
2083         assert_return(bus->input_fd == bus->output_fd, -EPERM);
2084         assert_return(!bus_pid_changed(bus), -ECHILD);
2085
2086         return bus->input_fd;
2087 }
2088
2089 _public_ int sd_bus_get_events(sd_bus *bus) {
2090         int flags = 0;
2091
2092         assert_return(bus, -EINVAL);
2093         assert_return(!bus_pid_changed(bus), -ECHILD);
2094
2095         if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
2096                 return -ENOTCONN;
2097
2098         if (bus->state == BUS_OPENING)
2099                 flags |= POLLOUT;
2100         else if (bus->state == BUS_AUTHENTICATING) {
2101
2102                 if (bus_socket_auth_needs_write(bus))
2103                         flags |= POLLOUT;
2104
2105                 flags |= POLLIN;
2106
2107         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
2108                 if (bus->rqueue_size <= 0)
2109                         flags |= POLLIN;
2110                 if (bus->wqueue_size > 0)
2111                         flags |= POLLOUT;
2112         }
2113
2114         return flags;
2115 }
2116
2117 _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
2118         struct reply_callback *c;
2119
2120         assert_return(bus, -EINVAL);
2121         assert_return(timeout_usec, -EINVAL);
2122         assert_return(!bus_pid_changed(bus), -ECHILD);
2123
2124         if (!BUS_IS_OPEN(bus->state) && bus->state != BUS_CLOSING)
2125                 return -ENOTCONN;
2126
2127         if (bus->track_queue) {
2128                 *timeout_usec = 0;
2129                 return 1;
2130         }
2131
2132         if (bus->state == BUS_CLOSING) {
2133                 *timeout_usec = 0;
2134                 return 1;
2135         }
2136
2137         if (bus->state == BUS_AUTHENTICATING) {
2138                 *timeout_usec = bus->auth_timeout;
2139                 return 1;
2140         }
2141
2142         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
2143                 *timeout_usec = (uint64_t) -1;
2144                 return 0;
2145         }
2146
2147         if (bus->rqueue_size > 0) {
2148                 *timeout_usec = 0;
2149                 return 1;
2150         }
2151
2152         c = prioq_peek(bus->reply_callbacks_prioq);
2153         if (!c) {
2154                 *timeout_usec = (uint64_t) -1;
2155                 return 0;
2156         }
2157
2158         if (c->timeout == 0) {
2159                 *timeout_usec = (uint64_t) -1;
2160                 return 0;
2161         }
2162
2163         *timeout_usec = c->timeout;
2164         return 1;
2165 }
2166
2167 static int process_timeout(sd_bus *bus) {
2168         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2169         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
2170         struct reply_callback *c;
2171         sd_bus_slot *slot;
2172         usec_t n;
2173         int r;
2174
2175         assert(bus);
2176
2177         c = prioq_peek(bus->reply_callbacks_prioq);
2178         if (!c)
2179                 return 0;
2180
2181         n = now(CLOCK_MONOTONIC);
2182         if (c->timeout > n)
2183                 return 0;
2184
2185         r = bus_message_new_synthetic_error(
2186                         bus,
2187                         c->cookie,
2188                         &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
2189                         &m);
2190         if (r < 0)
2191                 return r;
2192
2193         r = bus_seal_synthetic_message(bus, m);
2194         if (r < 0)
2195                 return r;
2196
2197         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
2198         c->timeout = 0;
2199
2200         ordered_hashmap_remove(bus->reply_callbacks, &c->cookie);
2201         c->cookie = 0;
2202
2203         slot = container_of(c, sd_bus_slot, reply_callback);
2204
2205         bus->iteration_counter ++;
2206
2207         bus->current_message = m;
2208         bus->current_slot = sd_bus_slot_ref(slot);
2209         bus->current_handler = c->callback;
2210         bus->current_userdata = slot->userdata;
2211         r = c->callback(m, slot->userdata, &error_buffer);
2212         bus->current_userdata = NULL;
2213         bus->current_handler = NULL;
2214         bus->current_slot = NULL;
2215         bus->current_message = NULL;
2216
2217         if (slot->floating) {
2218                 bus_slot_disconnect(slot);
2219                 sd_bus_slot_unref(slot);
2220         }
2221
2222         sd_bus_slot_unref(slot);
2223
2224         return bus_maybe_reply_error(m, r, &error_buffer);
2225 }
2226
2227 static int process_hello(sd_bus *bus, sd_bus_message *m) {
2228         assert(bus);
2229         assert(m);
2230
2231         if (bus->state != BUS_HELLO)
2232                 return 0;
2233
2234         /* Let's make sure the first message on the bus is the HELLO
2235          * reply. But note that we don't actually parse the message
2236          * here (we leave that to the usual handling), we just verify
2237          * we don't let any earlier msg through. */
2238
2239         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
2240             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
2241                 return -EIO;
2242
2243         if (m->reply_cookie != 1)
2244                 return -EIO;
2245
2246         return 0;
2247 }
2248
2249 static int process_reply(sd_bus *bus, sd_bus_message *m) {
2250         _cleanup_bus_message_unref_ sd_bus_message *synthetic_reply = NULL;
2251         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2252         struct reply_callback *c;
2253         sd_bus_slot *slot;
2254         int r;
2255
2256         assert(bus);
2257         assert(m);
2258
2259         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
2260             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
2261                 return 0;
2262
2263         if (bus->is_kernel && (bus->hello_flags & KDBUS_HELLO_MONITOR))
2264                 return 0;
2265
2266         if (m->destination && bus->unique_name && !streq_ptr(m->destination, bus->unique_name))
2267                 return 0;
2268
2269         c = ordered_hashmap_remove(bus->reply_callbacks, &m->reply_cookie);
2270         if (!c)
2271                 return 0;
2272
2273         c->cookie = 0;
2274
2275         slot = container_of(c, sd_bus_slot, reply_callback);
2276
2277         if (m->n_fds > 0 && !(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)) {
2278
2279                 /* If the reply contained a file descriptor which we
2280                  * didn't want we pass an error instead. */
2281
2282                 r = bus_message_new_synthetic_error(
2283                                 bus,
2284                                 m->reply_cookie,
2285                                 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Reply message contained file descriptor"),
2286                                 &synthetic_reply);
2287                 if (r < 0)
2288                         return r;
2289
2290                 /* Copy over original timestamp */
2291                 synthetic_reply->realtime = m->realtime;
2292                 synthetic_reply->monotonic = m->monotonic;
2293                 synthetic_reply->seqnum = m->seqnum;
2294
2295                 r = bus_seal_synthetic_message(bus, synthetic_reply);
2296                 if (r < 0)
2297                         return r;
2298
2299                 m = synthetic_reply;
2300         } else {
2301                 r = sd_bus_message_rewind(m, true);
2302                 if (r < 0)
2303                         return r;
2304         }
2305
2306         if (c->timeout != 0) {
2307                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2308                 c->timeout = 0;
2309         }
2310
2311         bus->current_slot = sd_bus_slot_ref(slot);
2312         bus->current_handler = c->callback;
2313         bus->current_userdata = slot->userdata;
2314         r = c->callback(m, slot->userdata, &error_buffer);
2315         bus->current_userdata = NULL;
2316         bus->current_handler = NULL;
2317         bus->current_slot = NULL;
2318
2319         if (slot->floating) {
2320                 bus_slot_disconnect(slot);
2321                 sd_bus_slot_unref(slot);
2322         }
2323
2324         sd_bus_slot_unref(slot);
2325
2326         return bus_maybe_reply_error(m, r, &error_buffer);
2327 }
2328
2329 static int process_filter(sd_bus *bus, sd_bus_message *m) {
2330         _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2331         struct filter_callback *l;
2332         int r;
2333
2334         assert(bus);
2335         assert(m);
2336
2337         do {
2338                 bus->filter_callbacks_modified = false;
2339
2340                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
2341                         sd_bus_slot *slot;
2342
2343                         if (bus->filter_callbacks_modified)
2344                                 break;
2345
2346                         /* Don't run this more than once per iteration */
2347                         if (l->last_iteration == bus->iteration_counter)
2348                                 continue;
2349
2350                         l->last_iteration = bus->iteration_counter;
2351
2352                         r = sd_bus_message_rewind(m, true);
2353                         if (r < 0)
2354                                 return r;
2355
2356                         slot = container_of(l, sd_bus_slot, filter_callback);
2357
2358                         bus->current_slot = sd_bus_slot_ref(slot);
2359                         bus->current_handler = l->callback;
2360                         bus->current_userdata = slot->userdata;
2361                         r = l->callback(m, slot->userdata, &error_buffer);
2362                         bus->current_userdata = NULL;
2363                         bus->current_handler = NULL;
2364                         bus->current_slot = sd_bus_slot_unref(slot);
2365
2366                         r = bus_maybe_reply_error(m, r, &error_buffer);
2367                         if (r != 0)
2368                                 return r;
2369
2370                 }
2371
2372         } while (bus->filter_callbacks_modified);
2373
2374         return 0;
2375 }
2376
2377 static int process_match(sd_bus *bus, sd_bus_message *m) {
2378         int r;
2379
2380         assert(bus);
2381         assert(m);
2382
2383         do {
2384                 bus->match_callbacks_modified = false;
2385
2386                 r = bus_match_run(bus, &bus->match_callbacks, m);
2387                 if (r != 0)
2388                         return r;
2389
2390         } while (bus->match_callbacks_modified);
2391
2392         return 0;
2393 }
2394
2395 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
2396         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
2397         int r;
2398
2399         assert(bus);
2400         assert(m);
2401
2402         if (bus->hello_flags & KDBUS_HELLO_MONITOR)
2403                 return 0;
2404
2405         if (bus->manual_peer_interface)
2406                 return 0;
2407
2408         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2409                 return 0;
2410
2411         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
2412                 return 0;
2413
2414         if (m->header->flags & BUS_MESSAGE_NO_REPLY_EXPECTED)
2415                 return 1;
2416
2417         if (streq_ptr(m->member, "Ping"))
2418                 r = sd_bus_message_new_method_return(m, &reply);
2419         else if (streq_ptr(m->member, "GetMachineId")) {
2420                 sd_id128_t id;
2421                 char sid[33];
2422
2423                 r = sd_id128_get_machine(&id);
2424                 if (r < 0)
2425                         return r;
2426
2427                 r = sd_bus_message_new_method_return(m, &reply);
2428                 if (r < 0)
2429                         return r;
2430
2431                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
2432         } else {
2433                 r = sd_bus_message_new_method_errorf(
2434                                 m, &reply,
2435                                 SD_BUS_ERROR_UNKNOWN_METHOD,
2436                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
2437         }
2438
2439         if (r < 0)
2440                 return r;
2441
2442         r = sd_bus_send(bus, reply, NULL);
2443         if (r < 0)
2444                 return r;
2445
2446         return 1;
2447 }
2448
2449 static int process_fd_check(sd_bus *bus, sd_bus_message *m) {
2450         assert(bus);
2451         assert(m);
2452
2453         /* If we got a message with a file descriptor which we didn't
2454          * want to accept, then let's drop it. How can this even
2455          * happen? For example, when the kernel queues a message into
2456          * an activatable names's queue which allows fds, and then is
2457          * delivered to us later even though we ourselves did not
2458          * negotiate it. */
2459
2460         if (bus->hello_flags & KDBUS_HELLO_MONITOR)
2461                 return 0;
2462
2463         if (m->n_fds <= 0)
2464                 return 0;
2465
2466         if (bus->hello_flags & KDBUS_HELLO_ACCEPT_FD)
2467                 return 0;
2468
2469         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
2470                 return 1; /* just eat it up */
2471
2472         return sd_bus_reply_method_errorf(m, SD_BUS_ERROR_INCONSISTENT_MESSAGE, "Message contains file descriptors, which I cannot accept. Sorry.");
2473 }
2474
2475 static int process_message(sd_bus *bus, sd_bus_message *m) {
2476         int r;
2477
2478         assert(bus);
2479         assert(m);
2480
2481         bus->current_message = m;
2482         bus->iteration_counter++;
2483
2484         log_debug_bus_message(m);
2485
2486         r = process_hello(bus, m);
2487         if (r != 0)
2488                 goto finish;
2489
2490         r = process_reply(bus, m);
2491         if (r != 0)
2492                 goto finish;
2493
2494         r = process_fd_check(bus, m);
2495         if (r != 0)
2496                 goto finish;
2497
2498         r = process_filter(bus, m);
2499         if (r != 0)
2500                 goto finish;
2501
2502         r = process_match(bus, m);
2503         if (r != 0)
2504                 goto finish;
2505
2506         r = process_builtin(bus, m);
2507         if (r != 0)
2508                 goto finish;
2509
2510         r = bus_process_object(bus, m);
2511
2512 finish:
2513         bus->current_message = NULL;
2514         return r;
2515 }
2516
2517 static int dispatch_track(sd_bus *bus) {
2518         assert(bus);
2519
2520         if (!bus->track_queue)
2521                 return 0;
2522
2523         bus_track_dispatch(bus->track_queue);
2524         return 1;
2525 }
2526
2527 static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
2528         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2529         int r;
2530
2531         assert(bus);
2532         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
2533
2534         r = process_timeout(bus);
2535         if (r != 0)
2536                 goto null_message;
2537
2538         r = dispatch_wqueue(bus);
2539         if (r != 0)
2540                 goto null_message;
2541
2542         r = dispatch_track(bus);
2543         if (r != 0)
2544                 goto null_message;
2545
2546         r = dispatch_rqueue(bus, hint_priority, priority, &m);
2547         if (r < 0)
2548                 return r;
2549         if (!m)
2550                 goto null_message;
2551
2552         r = process_message(bus, m);
2553         if (r != 0)
2554                 goto null_message;
2555
2556         if (ret) {
2557                 r = sd_bus_message_rewind(m, true);
2558                 if (r < 0)
2559                         return r;
2560
2561                 *ret = m;
2562                 m = NULL;
2563                 return 1;
2564         }
2565
2566         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
2567
2568                 log_debug("Unprocessed message call sender=%s object=%s interface=%s member=%s",
2569                           strna(sd_bus_message_get_sender(m)),
2570                           strna(sd_bus_message_get_path(m)),
2571                           strna(sd_bus_message_get_interface(m)),
2572                           strna(sd_bus_message_get_member(m)));
2573
2574                 r = sd_bus_reply_method_errorf(
2575                                 m,
2576                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
2577                                 "Unknown object '%s'.", m->path);
2578                 if (r < 0)
2579                         return r;
2580         }
2581
2582         return 1;
2583
2584 null_message:
2585         if (r >= 0 && ret)
2586                 *ret = NULL;
2587
2588         return r;
2589 }
2590
2591 static int process_closing(sd_bus *bus, sd_bus_message **ret) {
2592         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
2593         struct reply_callback *c;
2594         int r;
2595
2596         assert(bus);
2597         assert(bus->state == BUS_CLOSING);
2598
2599         c = ordered_hashmap_first(bus->reply_callbacks);
2600         if (c) {
2601                 _cleanup_bus_error_free_ sd_bus_error error_buffer = SD_BUS_ERROR_NULL;
2602                 sd_bus_slot *slot;
2603
2604                 /* First, fail all outstanding method calls */
2605                 r = bus_message_new_synthetic_error(
2606                                 bus,
2607                                 c->cookie,
2608                                 &SD_BUS_ERROR_MAKE_CONST(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
2609                                 &m);
2610                 if (r < 0)
2611                         return r;
2612
2613                 r = bus_seal_synthetic_message(bus, m);
2614                 if (r < 0)
2615                         return r;
2616
2617                 if (c->timeout != 0) {
2618                         prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
2619                         c->timeout = 0;
2620                 }
2621
2622                 ordered_hashmap_remove(bus->reply_callbacks, &c->cookie);
2623                 c->cookie = 0;
2624
2625                 slot = container_of(c, sd_bus_slot, reply_callback);
2626
2627                 bus->iteration_counter++;
2628
2629                 bus->current_message = m;
2630                 bus->current_slot = sd_bus_slot_ref(slot);
2631                 bus->current_handler = c->callback;
2632                 bus->current_userdata = slot->userdata;
2633                 r = c->callback(m, slot->userdata, &error_buffer);
2634                 bus->current_userdata = NULL;
2635                 bus->current_handler = NULL;
2636                 bus->current_slot = NULL;
2637                 bus->current_message = NULL;
2638
2639                 if (slot->floating) {
2640                         bus_slot_disconnect(slot);
2641                         sd_bus_slot_unref(slot);
2642                 }
2643
2644                 sd_bus_slot_unref(slot);
2645
2646                 return bus_maybe_reply_error(m, r, &error_buffer);
2647         }
2648
2649         /* Then, synthesize a Disconnected message */
2650         r = sd_bus_message_new_signal(
2651                         bus,
2652                         &m,
2653                         "/org/freedesktop/DBus/Local",
2654                         "org.freedesktop.DBus.Local",
2655                         "Disconnected");
2656         if (r < 0)
2657                 return r;
2658
2659         bus_message_set_sender_local(bus, m);
2660
2661         r = bus_seal_synthetic_message(bus, m);
2662         if (r < 0)
2663                 return r;
2664
2665         sd_bus_close(bus);
2666
2667         bus->current_message = m;
2668         bus->iteration_counter++;
2669
2670         r = process_filter(bus, m);
2671         if (r != 0)
2672                 goto finish;
2673
2674         r = process_match(bus, m);
2675         if (r != 0)
2676                 goto finish;
2677
2678         if (ret) {
2679                 *ret = m;
2680                 m = NULL;
2681         }
2682
2683         r = 1;
2684
2685 finish:
2686         bus->current_message = NULL;
2687
2688         return r;
2689 }
2690
2691 static int bus_process_internal(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
2692         BUS_DONT_DESTROY(bus);
2693         int r;
2694
2695         /* Returns 0 when we didn't do anything. This should cause the
2696          * caller to invoke sd_bus_wait() before returning the next
2697          * time. Returns > 0 when we did something, which possibly
2698          * means *ret is filled in with an unprocessed message. */
2699
2700         assert_return(bus, -EINVAL);
2701         assert_return(!bus_pid_changed(bus), -ECHILD);
2702
2703         /* We don't allow recursively invoking sd_bus_process(). */
2704         assert_return(!bus->current_message, -EBUSY);
2705         assert(!bus->current_slot);
2706
2707         switch (bus->state) {
2708
2709         case BUS_UNSET:
2710                 return -ENOTCONN;
2711
2712         case BUS_CLOSED:
2713                 return -ECONNRESET;
2714
2715         case BUS_OPENING:
2716                 r = bus_socket_process_opening(bus);
2717                 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2718                         bus_enter_closing(bus);
2719                         r = 1;
2720                 } else if (r < 0)
2721                         return r;
2722                 if (ret)
2723                         *ret = NULL;
2724                 return r;
2725
2726         case BUS_AUTHENTICATING:
2727                 r = bus_socket_process_authenticating(bus);
2728                 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2729                         bus_enter_closing(bus);
2730                         r = 1;
2731                 } else if (r < 0)
2732                         return r;
2733
2734                 if (ret)
2735                         *ret = NULL;
2736
2737                 return r;
2738
2739         case BUS_RUNNING:
2740         case BUS_HELLO:
2741                 r = process_running(bus, hint_priority, priority, ret);
2742                 if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2743                         bus_enter_closing(bus);
2744                         r = 1;
2745
2746                         if (ret)
2747                                 *ret = NULL;
2748                 }
2749
2750                 return r;
2751
2752         case BUS_CLOSING:
2753                 return process_closing(bus, ret);
2754         }
2755
2756         assert_not_reached("Unknown state");
2757 }
2758
2759 _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
2760         return bus_process_internal(bus, false, 0, ret);
2761 }
2762
2763 _public_ int sd_bus_process_priority(sd_bus *bus, int64_t priority, sd_bus_message **ret) {
2764         return bus_process_internal(bus, true, priority, ret);
2765 }
2766
2767 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2768         struct pollfd p[2] = {};
2769         int r, e, n;
2770         struct timespec ts;
2771         usec_t m = USEC_INFINITY;
2772
2773         assert(bus);
2774
2775         if (bus->state == BUS_CLOSING)
2776                 return 1;
2777
2778         if (!BUS_IS_OPEN(bus->state))
2779                 return -ENOTCONN;
2780
2781         e = sd_bus_get_events(bus);
2782         if (e < 0)
2783                 return e;
2784
2785         if (need_more)
2786                 /* The caller really needs some more data, he doesn't
2787                  * care about what's already read, or any timeouts
2788                  * except its own. */
2789                 e |= POLLIN;
2790         else {
2791                 usec_t until;
2792                 /* The caller wants to process if there's something to
2793                  * process, but doesn't care otherwise */
2794
2795                 r = sd_bus_get_timeout(bus, &until);
2796                 if (r < 0)
2797                         return r;
2798                 if (r > 0) {
2799                         usec_t nw;
2800                         nw = now(CLOCK_MONOTONIC);
2801                         m = until > nw ? until - nw : 0;
2802                 }
2803         }
2804
2805         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2806                 m = timeout_usec;
2807
2808         p[0].fd = bus->input_fd;
2809         if (bus->output_fd == bus->input_fd) {
2810                 p[0].events = e;
2811                 n = 1;
2812         } else {
2813                 p[0].events = e & POLLIN;
2814                 p[1].fd = bus->output_fd;
2815                 p[1].events = e & POLLOUT;
2816                 n = 2;
2817         }
2818
2819         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2820         if (r < 0)
2821                 return -errno;
2822
2823         return r > 0 ? 1 : 0;
2824 }
2825
2826 _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2827
2828         assert_return(bus, -EINVAL);
2829         assert_return(!bus_pid_changed(bus), -ECHILD);
2830
2831         if (bus->state == BUS_CLOSING)
2832                 return 0;
2833
2834         if (!BUS_IS_OPEN(bus->state))
2835                 return -ENOTCONN;
2836
2837         if (bus->rqueue_size > 0)
2838                 return 0;
2839
2840         return bus_poll(bus, false, timeout_usec);
2841 }
2842
2843 _public_ int sd_bus_flush(sd_bus *bus) {
2844         int r;
2845
2846         assert_return(bus, -EINVAL);
2847         assert_return(!bus_pid_changed(bus), -ECHILD);
2848
2849         if (bus->state == BUS_CLOSING)
2850                 return 0;
2851
2852         if (!BUS_IS_OPEN(bus->state))
2853                 return -ENOTCONN;
2854
2855         r = bus_ensure_running(bus);
2856         if (r < 0)
2857                 return r;
2858
2859         if (bus->wqueue_size <= 0)
2860                 return 0;
2861
2862         for (;;) {
2863                 r = dispatch_wqueue(bus);
2864                 if (r < 0) {
2865                         if (r == -ENOTCONN || r == -ECONNRESET || r == -EPIPE || r == -ESHUTDOWN) {
2866                                 bus_enter_closing(bus);
2867                                 return -ECONNRESET;
2868                         }
2869
2870                         return r;
2871                 }
2872
2873                 if (bus->wqueue_size <= 0)
2874                         return 0;
2875
2876                 r = bus_poll(bus, false, (uint64_t) -1);
2877                 if (r < 0)
2878                         return r;
2879         }
2880 }
2881
2882 _public_ int sd_bus_add_filter(
2883                 sd_bus *bus,
2884                 sd_bus_slot **slot,
2885                 sd_bus_message_handler_t callback,
2886                 void *userdata) {
2887
2888         sd_bus_slot *s;
2889
2890         assert_return(bus, -EINVAL);
2891         assert_return(callback, -EINVAL);
2892         assert_return(!bus_pid_changed(bus), -ECHILD);
2893
2894         s = bus_slot_allocate(bus, !slot, BUS_FILTER_CALLBACK, sizeof(struct filter_callback), userdata);
2895         if (!s)
2896                 return -ENOMEM;
2897
2898         s->filter_callback.callback = callback;
2899
2900         bus->filter_callbacks_modified = true;
2901         LIST_PREPEND(callbacks, bus->filter_callbacks, &s->filter_callback);
2902
2903         if (slot)
2904                 *slot = s;
2905
2906         return 0;
2907 }
2908
2909 _public_ int sd_bus_add_match(
2910                 sd_bus *bus,
2911                 sd_bus_slot **slot,
2912                 const char *match,
2913                 sd_bus_message_handler_t callback,
2914                 void *userdata) {
2915
2916         struct bus_match_component *components = NULL;
2917         unsigned n_components = 0;
2918         sd_bus_slot *s = NULL;
2919         int r = 0;
2920
2921         assert_return(bus, -EINVAL);
2922         assert_return(match, -EINVAL);
2923         assert_return(!bus_pid_changed(bus), -ECHILD);
2924
2925         r = bus_match_parse(match, &components, &n_components);
2926         if (r < 0)
2927                 goto finish;
2928
2929         s = bus_slot_allocate(bus, !slot, BUS_MATCH_CALLBACK, sizeof(struct match_callback), userdata);
2930         if (!s) {
2931                 r = -ENOMEM;
2932                 goto finish;
2933         }
2934
2935         s->match_callback.callback = callback;
2936         s->match_callback.cookie = ++bus->match_cookie;
2937
2938         if (bus->bus_client) {
2939                 enum bus_match_scope scope;
2940
2941                 scope = bus_match_get_scope(components, n_components);
2942
2943                 /* Do not install server-side matches for matches
2944                  * against the local service, interface or bus
2945                  * path. Also, when on kdbus don't install driver
2946                  * matches server side. */
2947                 if (scope == BUS_MATCH_GENERIC ||
2948                     (!bus->is_kernel && scope == BUS_MATCH_DRIVER)) {
2949
2950                         if (!bus->is_kernel) {
2951                                 /* When this is not a kernel transport, we
2952                                  * store the original match string, so that we
2953                                  * can use it to remove the match again */
2954
2955                                 s->match_callback.match_string = strdup(match);
2956                                 if (!s->match_callback.match_string) {
2957                                         r = -ENOMEM;
2958                                         goto finish;
2959                                 }
2960                         }
2961
2962                         r = bus_add_match_internal(bus, s->match_callback.match_string, components, n_components, s->match_callback.cookie);
2963                         if (r < 0)
2964                                 goto finish;
2965
2966                         s->match_added = true;
2967                 }
2968         }
2969
2970         bus->match_callbacks_modified = true;
2971         r = bus_match_add(&bus->match_callbacks, components, n_components, &s->match_callback);
2972         if (r < 0)
2973                 goto finish;
2974
2975         if (slot)
2976                 *slot = s;
2977         s = NULL;
2978
2979 finish:
2980         bus_match_parse_free(components, n_components);
2981         sd_bus_slot_unref(s);
2982
2983         return r;
2984 }
2985
2986 int bus_remove_match_by_string(
2987                 sd_bus *bus,
2988                 const char *match,
2989                 sd_bus_message_handler_t callback,
2990                 void *userdata) {
2991
2992         struct bus_match_component *components = NULL;
2993         unsigned n_components = 0;
2994         struct match_callback *c;
2995         int r = 0;
2996
2997         assert_return(bus, -EINVAL);
2998         assert_return(match, -EINVAL);
2999         assert_return(!bus_pid_changed(bus), -ECHILD);
3000
3001         r = bus_match_parse(match, &components, &n_components);
3002         if (r < 0)
3003                 goto finish;
3004
3005         r = bus_match_find(&bus->match_callbacks, components, n_components, NULL, NULL, &c);
3006         if (r <= 0)
3007                 goto finish;
3008
3009         sd_bus_slot_unref(container_of(c, sd_bus_slot, match_callback));
3010
3011 finish:
3012         bus_match_parse_free(components, n_components);
3013
3014         return r;
3015 }
3016
3017 bool bus_pid_changed(sd_bus *bus) {
3018         assert(bus);
3019
3020         /* We don't support people creating a bus connection and
3021          * keeping it around over a fork(). Let's complain. */
3022
3023         return bus->original_pid != getpid();
3024 }
3025
3026 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
3027         sd_bus *bus = userdata;
3028         int r;
3029
3030         assert(bus);
3031
3032         r = sd_bus_process(bus, NULL);
3033         if (r < 0)
3034                 return r;
3035
3036         return 1;
3037 }
3038
3039 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
3040         sd_bus *bus = userdata;
3041         int r;
3042
3043         assert(bus);
3044
3045         r = sd_bus_process(bus, NULL);
3046         if (r < 0)
3047                 return r;
3048
3049         return 1;
3050 }
3051
3052 static int prepare_callback(sd_event_source *s, void *userdata) {
3053         sd_bus *bus = userdata;
3054         int r, e;
3055         usec_t until;
3056
3057         assert(s);
3058         assert(bus);
3059
3060         e = sd_bus_get_events(bus);
3061         if (e < 0)
3062                 return e;
3063
3064         if (bus->output_fd != bus->input_fd) {
3065
3066                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
3067                 if (r < 0)
3068                         return r;
3069
3070                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
3071                 if (r < 0)
3072                         return r;
3073         } else {
3074                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
3075                 if (r < 0)
3076                         return r;
3077         }
3078
3079         r = sd_bus_get_timeout(bus, &until);
3080         if (r < 0)
3081                 return r;
3082         if (r > 0) {
3083                 int j;
3084
3085                 j = sd_event_source_set_time(bus->time_event_source, until);
3086                 if (j < 0)
3087                         return j;
3088         }
3089
3090         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
3091         if (r < 0)
3092                 return r;
3093
3094         return 1;
3095 }
3096
3097 static int quit_callback(sd_event_source *event, void *userdata) {
3098         sd_bus *bus = userdata;
3099
3100         assert(event);
3101
3102         sd_bus_flush(bus);
3103         sd_bus_close(bus);
3104
3105         return 1;
3106 }
3107
3108 static int attach_io_events(sd_bus *bus) {
3109         int r;
3110
3111         assert(bus);
3112
3113         if (bus->input_fd < 0)
3114                 return 0;
3115
3116         if (!bus->event)
3117                 return 0;
3118
3119         if (!bus->input_io_event_source) {
3120                 r = sd_event_add_io(bus->event, &bus->input_io_event_source, bus->input_fd, 0, io_callback, bus);
3121                 if (r < 0)
3122                         return r;
3123
3124                 r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
3125                 if (r < 0)
3126                         return r;
3127
3128                 r = sd_event_source_set_priority(bus->input_io_event_source, bus->event_priority);
3129                 if (r < 0)
3130                         return r;
3131
3132                 r = sd_event_source_set_description(bus->input_io_event_source, "bus-input");
3133         } else
3134                 r = sd_event_source_set_io_fd(bus->input_io_event_source, bus->input_fd);
3135
3136         if (r < 0)
3137                 return r;
3138
3139         if (bus->output_fd != bus->input_fd) {
3140                 assert(bus->output_fd >= 0);
3141
3142                 if (!bus->output_io_event_source) {
3143                         r = sd_event_add_io(bus->event, &bus->output_io_event_source, bus->output_fd, 0, io_callback, bus);
3144                         if (r < 0)
3145                                 return r;
3146
3147                         r = sd_event_source_set_priority(bus->output_io_event_source, bus->event_priority);
3148                         if (r < 0)
3149                                 return r;
3150
3151                         r = sd_event_source_set_description(bus->input_io_event_source, "bus-output");
3152                 } else
3153                         r = sd_event_source_set_io_fd(bus->output_io_event_source, bus->output_fd);
3154
3155                 if (r < 0)
3156                         return r;
3157         }
3158
3159         return 0;
3160 }
3161
3162 static void detach_io_events(sd_bus *bus) {
3163         assert(bus);
3164
3165         if (bus->input_io_event_source) {
3166                 sd_event_source_set_enabled(bus->input_io_event_source, SD_EVENT_OFF);
3167                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
3168         }
3169
3170         if (bus->output_io_event_source) {
3171                 sd_event_source_set_enabled(bus->output_io_event_source, SD_EVENT_OFF);
3172                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
3173         }
3174 }
3175
3176 _public_ int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
3177         int r;
3178
3179         assert_return(bus, -EINVAL);
3180         assert_return(!bus->event, -EBUSY);
3181
3182         assert(!bus->input_io_event_source);
3183         assert(!bus->output_io_event_source);
3184         assert(!bus->time_event_source);
3185
3186         if (event)
3187                 bus->event = sd_event_ref(event);
3188         else  {
3189                 r = sd_event_default(&bus->event);
3190                 if (r < 0)
3191                         return r;
3192         }
3193
3194         bus->event_priority = priority;
3195
3196         r = sd_event_add_time(bus->event, &bus->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, bus);
3197         if (r < 0)
3198                 goto fail;
3199
3200         r = sd_event_source_set_priority(bus->time_event_source, priority);
3201         if (r < 0)
3202                 goto fail;
3203
3204         r = sd_event_source_set_description(bus->time_event_source, "bus-time");
3205         if (r < 0)
3206                 goto fail;
3207
3208         r = sd_event_add_exit(bus->event, &bus->quit_event_source, quit_callback, bus);
3209         if (r < 0)
3210                 goto fail;
3211
3212         r = sd_event_source_set_description(bus->quit_event_source, "bus-exit");
3213         if (r < 0)
3214                 goto fail;
3215
3216         r = attach_io_events(bus);
3217         if (r < 0)
3218                 goto fail;
3219
3220         return 0;
3221
3222 fail:
3223         sd_bus_detach_event(bus);
3224         return r;
3225 }
3226
3227 _public_ int sd_bus_detach_event(sd_bus *bus) {
3228         assert_return(bus, -EINVAL);
3229
3230         if (!bus->event)
3231                 return 0;
3232
3233         detach_io_events(bus);
3234
3235         if (bus->time_event_source) {
3236                 sd_event_source_set_enabled(bus->time_event_source, SD_EVENT_OFF);
3237                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
3238         }
3239
3240         if (bus->quit_event_source) {
3241                 sd_event_source_set_enabled(bus->quit_event_source, SD_EVENT_OFF);
3242                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
3243         }
3244
3245         bus->event = sd_event_unref(bus->event);
3246         return 1;
3247 }
3248
3249 _public_ sd_event* sd_bus_get_event(sd_bus *bus) {
3250         assert_return(bus, NULL);
3251
3252         return bus->event;
3253 }
3254
3255 _public_ sd_bus_message* sd_bus_get_current_message(sd_bus *bus) {
3256         assert_return(bus, NULL);
3257
3258         return bus->current_message;