chiark / gitweb /
55d964ed8f0269d20ae09fb4f5470e199f4220d5
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49
50 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
51
52 static void bus_close_fds(sd_bus *b) {
53         assert(b);
54
55         if (b->input_fd >= 0)
56                 close_nointr_nofail(b->input_fd);
57
58         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
59                 close_nointr_nofail(b->output_fd);
60
61         b->input_fd = b->output_fd = -1;
62 }
63
64 static void bus_node_destroy(sd_bus *b, struct node *n) {
65         struct node_callback *c;
66         struct node_vtable *v;
67         struct node_enumerator *e;
68
69         assert(b);
70
71         if (!n)
72                 return;
73
74         while (n->child)
75                 bus_node_destroy(b, n->child);
76
77         while ((c = n->callbacks)) {
78                 LIST_REMOVE(callbacks, n->callbacks, c);
79                 free(c);
80         }
81
82         while ((v = n->vtables)) {
83                 LIST_REMOVE(vtables, n->vtables, v);
84                 free(v->interface);
85                 free(v);
86         }
87
88         while ((e = n->enumerators)) {
89                 LIST_REMOVE(enumerators, n->enumerators, e);
90                 free(e);
91         }
92
93         if (n->parent)
94                 LIST_REMOVE(siblings, n->parent->child, n);
95
96         assert_se(hashmap_remove(b->nodes, n->path) == n);
97         free(n->path);
98         free(n);
99 }
100
101 static void bus_free(sd_bus *b) {
102         struct filter_callback *f;
103         struct node *n;
104         unsigned i;
105
106         assert(b);
107
108         sd_bus_detach_event(b);
109
110         bus_close_fds(b);
111
112         if (b->kdbus_buffer)
113                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
114
115         free(b->rbuffer);
116         free(b->unique_name);
117         free(b->auth_buffer);
118         free(b->address);
119         free(b->kernel);
120
121         free(b->exec_path);
122         strv_free(b->exec_argv);
123
124         close_many(b->fds, b->n_fds);
125         free(b->fds);
126
127         for (i = 0; i < b->rqueue_size; i++)
128                 sd_bus_message_unref(b->rqueue[i]);
129         free(b->rqueue);
130
131         for (i = 0; i < b->wqueue_size; i++)
132                 sd_bus_message_unref(b->wqueue[i]);
133         free(b->wqueue);
134
135         hashmap_free_free(b->reply_callbacks);
136         prioq_free(b->reply_callbacks_prioq);
137
138         while ((f = b->filter_callbacks)) {
139                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
140                 free(f);
141         }
142
143         bus_match_free(&b->match_callbacks);
144
145         hashmap_free_free(b->vtable_methods);
146         hashmap_free_free(b->vtable_properties);
147
148         while ((n = hashmap_first(b->nodes)))
149                 bus_node_destroy(b, n);
150
151         hashmap_free(b->nodes);
152
153         bus_kernel_flush_memfd(b);
154
155         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
156
157         free(b);
158 }
159
160 int sd_bus_new(sd_bus **ret) {
161         sd_bus *r;
162
163         assert_return(ret, -EINVAL);
164
165         r = new0(sd_bus, 1);
166         if (!r)
167                 return -ENOMEM;
168
169         r->n_ref = REFCNT_INIT;
170         r->input_fd = r->output_fd = -1;
171         r->message_version = 1;
172         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
173         r->original_pid = getpid();
174
175         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
176
177         /* We guarantee that wqueue always has space for at least one
178          * entry */
179         r->wqueue = new(sd_bus_message*, 1);
180         if (!r->wqueue) {
181                 free(r);
182                 return -ENOMEM;
183         }
184
185         *ret = r;
186         return 0;
187 }
188
189 int sd_bus_set_address(sd_bus *bus, const char *address) {
190         char *a;
191
192         assert_return(bus, -EINVAL);
193         assert_return(bus->state == BUS_UNSET, -EPERM);
194         assert_return(address, -EINVAL);
195         assert_return(!bus_pid_changed(bus), -ECHILD);
196
197         a = strdup(address);
198         if (!a)
199                 return -ENOMEM;
200
201         free(bus->address);
202         bus->address = a;
203
204         return 0;
205 }
206
207 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
208         assert_return(bus, -EINVAL);
209         assert_return(bus->state == BUS_UNSET, -EPERM);
210         assert_return(input_fd >= 0, -EINVAL);
211         assert_return(output_fd >= 0, -EINVAL);
212         assert_return(!bus_pid_changed(bus), -ECHILD);
213
214         bus->input_fd = input_fd;
215         bus->output_fd = output_fd;
216         return 0;
217 }
218
219 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
220         char *p, **a;
221
222         assert_return(bus, -EINVAL);
223         assert_return(bus->state == BUS_UNSET, -EPERM);
224         assert_return(path, -EINVAL);
225         assert_return(!strv_isempty(argv), -EINVAL);
226         assert_return(!bus_pid_changed(bus), -ECHILD);
227
228         p = strdup(path);
229         if (!p)
230                 return -ENOMEM;
231
232         a = strv_copy(argv);
233         if (!a) {
234                 free(p);
235                 return -ENOMEM;
236         }
237
238         free(bus->exec_path);
239         strv_free(bus->exec_argv);
240
241         bus->exec_path = p;
242         bus->exec_argv = a;
243
244         return 0;
245 }
246
247 int sd_bus_set_bus_client(sd_bus *bus, int b) {
248         assert_return(bus, -EINVAL);
249         assert_return(bus->state == BUS_UNSET, -EPERM);
250         assert_return(!bus_pid_changed(bus), -ECHILD);
251
252         bus->bus_client = !!b;
253         return 0;
254 }
255
256 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
257         assert_return(bus, -EINVAL);
258         assert_return(bus->state == BUS_UNSET, -EPERM);
259         assert_return(!bus_pid_changed(bus), -ECHILD);
260
261         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
262         return 0;
263 }
264
265 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
266         assert_return(bus, -EINVAL);
267         assert_return(bus->state == BUS_UNSET, -EPERM);
268         assert_return(!bus_pid_changed(bus), -ECHILD);
269
270         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
271         return 0;
272 }
273
274 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
275         assert_return(bus, -EINVAL);
276         assert_return(bus->state == BUS_UNSET, -EPERM);
277         assert_return(!bus_pid_changed(bus), -ECHILD);
278
279         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
280         return 0;
281 }
282
283 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
284         assert_return(bus, -EINVAL);
285         assert_return(bus->state == BUS_UNSET, -EPERM);
286         assert_return(!bus_pid_changed(bus), -ECHILD);
287
288         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
289         return 0;
290 }
291
292 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
293         assert_return(bus, -EINVAL);
294         assert_return(bus->state == BUS_UNSET, -EPERM);
295         assert_return(!bus_pid_changed(bus), -ECHILD);
296
297         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
298         return 0;
299 }
300
301 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
302         assert_return(bus, -EINVAL);
303         assert_return(bus->state == BUS_UNSET, -EPERM);
304         assert_return(!bus_pid_changed(bus), -ECHILD);
305
306         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
307         return 0;
308 }
309
310 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
311         assert_return(bus, -EINVAL);
312         assert_return(bus->state == BUS_UNSET, -EPERM);
313         assert_return(!bus_pid_changed(bus), -ECHILD);
314
315         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
316         return 0;
317 }
318
319 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
320         assert_return(bus, -EINVAL);
321         assert_return(bus->state == BUS_UNSET, -EPERM);
322         assert_return(!bus_pid_changed(bus), -ECHILD);
323
324         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
325         return 0;
326 }
327
328 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
329         assert_return(bus, -EINVAL);
330         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
331         assert_return(bus->state == BUS_UNSET, -EPERM);
332         assert_return(!bus_pid_changed(bus), -ECHILD);
333
334         bus->is_server = !!b;
335         bus->server_id = server_id;
336         return 0;
337 }
338
339 int sd_bus_set_anonymous(sd_bus *bus, int b) {
340         assert_return(bus, -EINVAL);
341         assert_return(bus->state == BUS_UNSET, -EPERM);
342         assert_return(!bus_pid_changed(bus), -ECHILD);
343
344         bus->anonymous_auth = !!b;
345         return 0;
346 }
347
348 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
349         const char *s;
350         int r;
351
352         assert(bus);
353         assert(bus->state == BUS_HELLO);
354         assert(reply);
355
356         r = sd_bus_message_get_errno(reply);
357         if (r < 0)
358                 return r;
359         if (r > 0)
360                 return -r;
361
362         r = sd_bus_message_read(reply, "s", &s);
363         if (r < 0)
364                 return r;
365
366         if (!service_name_is_valid(s) || s[0] != ':')
367                 return -EBADMSG;
368
369         bus->unique_name = strdup(s);
370         if (!bus->unique_name)
371                 return -ENOMEM;
372
373         bus->state = BUS_RUNNING;
374
375         return 1;
376 }
377
378 static int bus_send_hello(sd_bus *bus) {
379         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
380         int r;
381
382         assert(bus);
383
384         if (!bus->bus_client || bus->is_kernel)
385                 return 0;
386
387         r = sd_bus_message_new_method_call(
388                         bus,
389                         "org.freedesktop.DBus",
390                         "/",
391                         "org.freedesktop.DBus",
392                         "Hello",
393                         &m);
394         if (r < 0)
395                 return r;
396
397         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
398 }
399
400 int bus_start_running(sd_bus *bus) {
401         assert(bus);
402
403         if (bus->bus_client && !bus->is_kernel) {
404                 bus->state = BUS_HELLO;
405                 return 1;
406         }
407
408         bus->state = BUS_RUNNING;
409         return 1;
410 }
411
412 static int parse_address_key(const char **p, const char *key, char **value) {
413         size_t l, n = 0;
414         const char *a;
415         char *r = NULL;
416
417         assert(p);
418         assert(*p);
419         assert(value);
420
421         if (key) {
422                 l = strlen(key);
423                 if (strncmp(*p, key, l) != 0)
424                         return 0;
425
426                 if ((*p)[l] != '=')
427                         return 0;
428
429                 if (*value)
430                         return -EINVAL;
431
432                 a = *p + l + 1;
433         } else
434                 a = *p;
435
436         while (*a != ';' && *a != ',' && *a != 0) {
437                 char c, *t;
438
439                 if (*a == '%') {
440                         int x, y;
441
442                         x = unhexchar(a[1]);
443                         if (x < 0) {
444                                 free(r);
445                                 return x;
446                         }
447
448                         y = unhexchar(a[2]);
449                         if (y < 0) {
450                                 free(r);
451                                 return y;
452                         }
453
454                         c = (char) ((x << 4) | y);
455                         a += 3;
456                 } else {
457                         c = *a;
458                         a++;
459                 }
460
461                 t = realloc(r, n + 2);
462                 if (!t) {
463                         free(r);
464                         return -ENOMEM;
465                 }
466
467                 r = t;
468                 r[n++] = c;
469         }
470
471         if (!r) {
472                 r = strdup("");
473                 if (!r)
474                         return -ENOMEM;
475         } else
476                 r[n] = 0;
477
478         if (*a == ',')
479                 a++;
480
481         *p = a;
482
483         free(*value);
484         *value = r;
485
486         return 1;
487 }
488
489 static void skip_address_key(const char **p) {
490         assert(p);
491         assert(*p);
492
493         *p += strcspn(*p, ",");
494
495         if (**p == ',')
496                 (*p) ++;
497 }
498
499 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
500         _cleanup_free_ char *path = NULL, *abstract = NULL;
501         size_t l;
502         int r;
503
504         assert(b);
505         assert(p);
506         assert(*p);
507         assert(guid);
508
509         while (**p != 0 && **p != ';') {
510                 r = parse_address_key(p, "guid", guid);
511                 if (r < 0)
512                         return r;
513                 else if (r > 0)
514                         continue;
515
516                 r = parse_address_key(p, "path", &path);
517                 if (r < 0)
518                         return r;
519                 else if (r > 0)
520                         continue;
521
522                 r = parse_address_key(p, "abstract", &abstract);
523                 if (r < 0)
524                         return r;
525                 else if (r > 0)
526                         continue;
527
528                 skip_address_key(p);
529         }
530
531         if (!path && !abstract)
532                 return -EINVAL;
533
534         if (path && abstract)
535                 return -EINVAL;
536
537         if (path) {
538                 l = strlen(path);
539                 if (l > sizeof(b->sockaddr.un.sun_path))
540                         return -E2BIG;
541
542                 b->sockaddr.un.sun_family = AF_UNIX;
543                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
544                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
545         } else if (abstract) {
546                 l = strlen(abstract);
547                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
548                         return -E2BIG;
549
550                 b->sockaddr.un.sun_family = AF_UNIX;
551                 b->sockaddr.un.sun_path[0] = 0;
552                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
553                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
554         }
555
556         return 0;
557 }
558
559 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
560         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
561         int r;
562         struct addrinfo *result, hints = {
563                 .ai_socktype = SOCK_STREAM,
564                 .ai_flags = AI_ADDRCONFIG,
565         };
566
567         assert(b);
568         assert(p);
569         assert(*p);
570         assert(guid);
571
572         while (**p != 0 && **p != ';') {
573                 r = parse_address_key(p, "guid", guid);
574                 if (r < 0)
575                         return r;
576                 else if (r > 0)
577                         continue;
578
579                 r = parse_address_key(p, "host", &host);
580                 if (r < 0)
581                         return r;
582                 else if (r > 0)
583                         continue;
584
585                 r = parse_address_key(p, "port", &port);
586                 if (r < 0)
587                         return r;
588                 else if (r > 0)
589                         continue;
590
591                 r = parse_address_key(p, "family", &family);
592                 if (r < 0)
593                         return r;
594                 else if (r > 0)
595                         continue;
596
597                 skip_address_key(p);
598         }
599
600         if (!host || !port)
601                 return -EINVAL;
602
603         if (family) {
604                 if (streq(family, "ipv4"))
605                         hints.ai_family = AF_INET;
606                 else if (streq(family, "ipv6"))
607                         hints.ai_family = AF_INET6;
608                 else
609                         return -EINVAL;
610         }
611
612         r = getaddrinfo(host, port, &hints, &result);
613         if (r == EAI_SYSTEM)
614                 return -errno;
615         else if (r != 0)
616                 return -EADDRNOTAVAIL;
617
618         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
619         b->sockaddr_size = result->ai_addrlen;
620
621         freeaddrinfo(result);
622
623         return 0;
624 }
625
626 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
627         char *path = NULL;
628         unsigned n_argv = 0, j;
629         char **argv = NULL;
630         int r;
631
632         assert(b);
633         assert(p);
634         assert(*p);
635         assert(guid);
636
637         while (**p != 0 && **p != ';') {
638                 r = parse_address_key(p, "guid", guid);
639                 if (r < 0)
640                         goto fail;
641                 else if (r > 0)
642                         continue;
643
644                 r = parse_address_key(p, "path", &path);
645                 if (r < 0)
646                         goto fail;
647                 else if (r > 0)
648                         continue;
649
650                 if (startswith(*p, "argv")) {
651                         unsigned ul;
652
653                         errno = 0;
654                         ul = strtoul(*p + 4, (char**) p, 10);
655                         if (errno > 0 || **p != '=' || ul > 256) {
656                                 r = -EINVAL;
657                                 goto fail;
658                         }
659
660                         (*p) ++;
661
662                         if (ul >= n_argv) {
663                                 char **x;
664
665                                 x = realloc(argv, sizeof(char*) * (ul + 2));
666                                 if (!x) {
667                                         r = -ENOMEM;
668                                         goto fail;
669                                 }
670
671                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
672
673                                 argv = x;
674                                 n_argv = ul + 1;
675                         }
676
677                         r = parse_address_key(p, NULL, argv + ul);
678                         if (r < 0)
679                                 goto fail;
680
681                         continue;
682                 }
683
684                 skip_address_key(p);
685         }
686
687         if (!path) {
688                 r = -EINVAL;
689                 goto fail;
690         }
691
692         /* Make sure there are no holes in the array, with the
693          * exception of argv[0] */
694         for (j = 1; j < n_argv; j++)
695                 if (!argv[j]) {
696                         r = -EINVAL;
697                         goto fail;
698                 }
699
700         if (argv && argv[0] == NULL) {
701                 argv[0] = strdup(path);
702                 if (!argv[0]) {
703                         r = -ENOMEM;
704                         goto fail;
705                 }
706         }
707
708         b->exec_path = path;
709         b->exec_argv = argv;
710         return 0;
711
712 fail:
713         for (j = 0; j < n_argv; j++)
714                 free(argv[j]);
715
716         free(argv);
717         free(path);
718         return r;
719 }
720
721 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
722         _cleanup_free_ char *path = NULL;
723         int r;
724
725         assert(b);
726         assert(p);
727         assert(*p);
728         assert(guid);
729
730         while (**p != 0 && **p != ';') {
731                 r = parse_address_key(p, "guid", guid);
732                 if (r < 0)
733                         return r;
734                 else if (r > 0)
735                         continue;
736
737                 r = parse_address_key(p, "path", &path);
738                 if (r < 0)
739                         return r;
740                 else if (r > 0)
741                         continue;
742
743                 skip_address_key(p);
744         }
745
746         if (!path)
747                 return -EINVAL;
748
749         free(b->kernel);
750         b->kernel = path;
751         path = NULL;
752
753         return 0;
754 }
755
756 static void bus_reset_parsed_address(sd_bus *b) {
757         assert(b);
758
759         zero(b->sockaddr);
760         b->sockaddr_size = 0;
761         strv_free(b->exec_argv);
762         free(b->exec_path);
763         b->exec_path = NULL;
764         b->exec_argv = NULL;
765         b->server_id = SD_ID128_NULL;
766         free(b->kernel);
767         b->kernel = NULL;
768 }
769
770 static int bus_parse_next_address(sd_bus *b) {
771         _cleanup_free_ char *guid = NULL;
772         const char *a;
773         int r;
774
775         assert(b);
776
777         if (!b->address)
778                 return 0;
779         if (b->address[b->address_index] == 0)
780                 return 0;
781
782         bus_reset_parsed_address(b);
783
784         a = b->address + b->address_index;
785
786         while (*a != 0) {
787
788                 if (*a == ';') {
789                         a++;
790                         continue;
791                 }
792
793                 if (startswith(a, "unix:")) {
794                         a += 5;
795
796                         r = parse_unix_address(b, &a, &guid);
797                         if (r < 0)
798                                 return r;
799                         break;
800
801                 } else if (startswith(a, "tcp:")) {
802
803                         a += 4;
804                         r = parse_tcp_address(b, &a, &guid);
805                         if (r < 0)
806                                 return r;
807
808                         break;
809
810                 } else if (startswith(a, "unixexec:")) {
811
812                         a += 9;
813                         r = parse_exec_address(b, &a, &guid);
814                         if (r < 0)
815                                 return r;
816
817                         break;
818
819                 } else if (startswith(a, "kernel:")) {
820
821                         a += 7;
822                         r = parse_kernel_address(b, &a, &guid);
823                         if (r < 0)
824                                 return r;
825
826                         break;
827                 }
828
829                 a = strchr(a, ';');
830                 if (!a)
831                         return 0;
832         }
833
834         if (guid) {
835                 r = sd_id128_from_string(guid, &b->server_id);
836                 if (r < 0)
837                         return r;
838         }
839
840         b->address_index = a - b->address;
841         return 1;
842 }
843
844 static int bus_start_address(sd_bus *b) {
845         int r;
846
847         assert(b);
848
849         for (;;) {
850                 sd_bus_close(b);
851
852                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
853
854                         r = bus_socket_connect(b);
855                         if (r >= 0)
856                                 return r;
857
858                         b->last_connect_error = -r;
859
860                 } else if (b->exec_path) {
861
862                         r = bus_socket_exec(b);
863                         if (r >= 0)
864                                 return r;
865
866                         b->last_connect_error = -r;
867                 } else if (b->kernel) {
868
869                         r = bus_kernel_connect(b);
870                         if (r >= 0)
871                                 return r;
872
873                         b->last_connect_error = -r;
874                 }
875
876                 r = bus_parse_next_address(b);
877                 if (r < 0)
878                         return r;
879                 if (r == 0)
880                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
881         }
882 }
883
884 int bus_next_address(sd_bus *b) {
885         assert(b);
886
887         bus_reset_parsed_address(b);
888         return bus_start_address(b);
889 }
890
891 static int bus_start_fd(sd_bus *b) {
892         struct stat st;
893         int r;
894
895         assert(b);
896         assert(b->input_fd >= 0);
897         assert(b->output_fd >= 0);
898
899         r = fd_nonblock(b->input_fd, true);
900         if (r < 0)
901                 return r;
902
903         r = fd_cloexec(b->input_fd, true);
904         if (r < 0)
905                 return r;
906
907         if (b->input_fd != b->output_fd) {
908                 r = fd_nonblock(b->output_fd, true);
909                 if (r < 0)
910                         return r;
911
912                 r = fd_cloexec(b->output_fd, true);
913                 if (r < 0)
914                         return r;
915         }
916
917         if (fstat(b->input_fd, &st) < 0)
918                 return -errno;
919
920         if (S_ISCHR(b->input_fd))
921                 return bus_kernel_take_fd(b);
922         else
923                 return bus_socket_take_fd(b);
924 }
925
926 int sd_bus_start(sd_bus *bus) {
927         int r;
928
929         assert_return(bus, -EINVAL);
930         assert_return(bus->state == BUS_UNSET, -EPERM);
931         assert_return(!bus_pid_changed(bus), -ECHILD);
932
933         bus->state = BUS_OPENING;
934
935         if (bus->is_server && bus->bus_client)
936                 return -EINVAL;
937
938         if (bus->input_fd >= 0)
939                 r = bus_start_fd(bus);
940         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
941                 r = bus_start_address(bus);
942         else
943                 return -EINVAL;
944
945         if (r < 0)
946                 return r;
947
948         return bus_send_hello(bus);
949 }
950
951 int sd_bus_open_system(sd_bus **ret) {
952         const char *e;
953         sd_bus *b;
954         int r;
955
956         assert_return(ret, -EINVAL);
957
958         r = sd_bus_new(&b);
959         if (r < 0)
960                 return r;
961
962         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
963         if (e) {
964                 r = sd_bus_set_address(b, e);
965                 if (r < 0)
966                         goto fail;
967         } else {
968                 b->sockaddr.un.sun_family = AF_UNIX;
969                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
970                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
971         }
972
973         b->bus_client = true;
974
975         r = sd_bus_start(b);
976         if (r < 0)
977                 goto fail;
978
979         *ret = b;
980         return 0;
981
982 fail:
983         bus_free(b);
984         return r;
985 }
986
987 int sd_bus_open_user(sd_bus **ret) {
988         const char *e;
989         sd_bus *b;
990         size_t l;
991         int r;
992
993         assert_return(ret, -EINVAL);
994
995         r = sd_bus_new(&b);
996         if (r < 0)
997                 return r;
998
999         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1000         if (e) {
1001                 r = sd_bus_set_address(b, e);
1002                 if (r < 0)
1003                         goto fail;
1004         } else {
1005                 e = secure_getenv("XDG_RUNTIME_DIR");
1006                 if (!e) {
1007                         r = -ENOENT;
1008                         goto fail;
1009                 }
1010
1011                 l = strlen(e);
1012                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1013                         r = -E2BIG;
1014                         goto fail;
1015                 }
1016
1017                 b->sockaddr.un.sun_family = AF_UNIX;
1018                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1019                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1020         }
1021
1022         b->bus_client = true;
1023
1024         r = sd_bus_start(b);
1025         if (r < 0)
1026                 goto fail;
1027
1028         *ret = b;
1029         return 0;
1030
1031 fail:
1032         bus_free(b);
1033         return r;
1034 }
1035
1036 void sd_bus_close(sd_bus *bus) {
1037         if (!bus)
1038                 return;
1039         if (bus->state == BUS_CLOSED)
1040                 return;
1041         if (bus_pid_changed(bus))
1042                 return;
1043
1044         bus->state = BUS_CLOSED;
1045
1046         sd_bus_detach_event(bus);
1047
1048         if (!bus->is_kernel)
1049                 bus_close_fds(bus);
1050
1051         /* We'll leave the fd open in case this is a kernel bus, since
1052          * there might still be memblocks around that reference this
1053          * bus, and they might need to invoke the
1054          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1055          * freed. */
1056 }
1057
1058 sd_bus *sd_bus_ref(sd_bus *bus) {
1059         if (!bus)
1060                 return NULL;
1061
1062         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1063
1064         return bus;
1065 }
1066
1067 sd_bus *sd_bus_unref(sd_bus *bus) {
1068         if (!bus)
1069                 return NULL;
1070
1071         if (REFCNT_DEC(bus->n_ref) <= 0)
1072                 bus_free(bus);
1073
1074         return NULL;
1075 }
1076
1077 int sd_bus_is_open(sd_bus *bus) {
1078
1079         assert_return(bus, -EINVAL);
1080         assert_return(!bus_pid_changed(bus), -ECHILD);
1081
1082         return BUS_IS_OPEN(bus->state);
1083 }
1084
1085 int sd_bus_can_send(sd_bus *bus, char type) {
1086         int r;
1087
1088         assert_return(bus, -EINVAL);
1089         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1090         assert_return(!bus_pid_changed(bus), -ECHILD);
1091
1092         if (type == SD_BUS_TYPE_UNIX_FD) {
1093                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1094                         return 0;
1095
1096                 r = bus_ensure_running(bus);
1097                 if (r < 0)
1098                         return r;
1099
1100                 return bus->can_fds;
1101         }
1102
1103         return bus_type_is_valid(type);
1104 }
1105
1106 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1107         int r;
1108
1109         assert_return(bus, -EINVAL);
1110         assert_return(server_id, -EINVAL);
1111         assert_return(!bus_pid_changed(bus), -ECHILD);
1112
1113         r = bus_ensure_running(bus);
1114         if (r < 0)
1115                 return r;
1116
1117         *server_id = bus->server_id;
1118         return 0;
1119 }
1120
1121 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1122         assert(m);
1123
1124         if (m->header->version > b->message_version)
1125                 return -EPERM;
1126
1127         if (m->sealed)
1128                 return 0;
1129
1130         return bus_message_seal(m, ++b->serial);
1131 }
1132
1133 static int dispatch_wqueue(sd_bus *bus) {
1134         int r, ret = 0;
1135
1136         assert(bus);
1137         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1138
1139         while (bus->wqueue_size > 0) {
1140
1141                 if (bus->is_kernel)
1142                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1143                 else
1144                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1145
1146                 if (r < 0) {
1147                         sd_bus_close(bus);
1148                         return r;
1149                 } else if (r == 0)
1150                         /* Didn't do anything this time */
1151                         return ret;
1152                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1153                         /* Fully written. Let's drop the entry from
1154                          * the queue.
1155                          *
1156                          * This isn't particularly optimized, but
1157                          * well, this is supposed to be our worst-case
1158                          * buffer only, and the socket buffer is
1159                          * supposed to be our primary buffer, and if
1160                          * it got full, then all bets are off
1161                          * anyway. */
1162
1163                         sd_bus_message_unref(bus->wqueue[0]);
1164                         bus->wqueue_size --;
1165                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1166                         bus->windex = 0;
1167
1168                         ret = 1;
1169                 }
1170         }
1171
1172         return ret;
1173 }
1174
1175 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1176         sd_bus_message *z = NULL;
1177         int r, ret = 0;
1178
1179         assert(bus);
1180         assert(m);
1181         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1182
1183         if (bus->rqueue_size > 0) {
1184                 /* Dispatch a queued message */
1185
1186                 *m = bus->rqueue[0];
1187                 bus->rqueue_size --;
1188                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1189                 return 1;
1190         }
1191
1192         /* Try to read a new message */
1193         do {
1194                 if (bus->is_kernel)
1195                         r = bus_kernel_read_message(bus, &z);
1196                 else
1197                         r = bus_socket_read_message(bus, &z);
1198
1199                 if (r < 0) {
1200                         sd_bus_close(bus);
1201                         return r;
1202                 }
1203                 if (r == 0)
1204                         return ret;
1205
1206                 ret = 1;
1207         } while (!z);
1208
1209         *m = z;
1210         return ret;
1211 }
1212
1213 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1214         int r;
1215
1216         assert_return(bus, -EINVAL);
1217         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1218         assert_return(m, -EINVAL);
1219         assert_return(!bus_pid_changed(bus), -ECHILD);
1220
1221         if (m->n_fds > 0) {
1222                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1223                 if (r < 0)
1224                         return r;
1225                 if (r == 0)
1226                         return -ENOTSUP;
1227         }
1228
1229         /* If the serial number isn't kept, then we know that no reply
1230          * is expected */
1231         if (!serial && !m->sealed)
1232                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1233
1234         r = bus_seal_message(bus, m);
1235         if (r < 0)
1236                 return r;
1237
1238         /* If this is a reply and no reply was requested, then let's
1239          * suppress this, if we can */
1240         if (m->dont_send && !serial)
1241                 return 1;
1242
1243         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1244                 size_t idx = 0;
1245
1246                 if (bus->is_kernel)
1247                         r = bus_kernel_write_message(bus, m);
1248                 else
1249                         r = bus_socket_write_message(bus, m, &idx);
1250
1251                 if (r < 0) {
1252                         sd_bus_close(bus);
1253                         return r;
1254                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1255                         /* Wasn't fully written. So let's remember how
1256                          * much was written. Note that the first entry
1257                          * of the wqueue array is always allocated so
1258                          * that we always can remember how much was
1259                          * written. */
1260                         bus->wqueue[0] = sd_bus_message_ref(m);
1261                         bus->wqueue_size = 1;
1262                         bus->windex = idx;
1263                 }
1264         } else {
1265                 sd_bus_message **q;
1266
1267                 /* Just append it to the queue. */
1268
1269                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1270                         return -ENOBUFS;
1271
1272                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1273                 if (!q)
1274                         return -ENOMEM;
1275
1276                 bus->wqueue = q;
1277                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1278         }
1279
1280         if (serial)
1281                 *serial = BUS_MESSAGE_SERIAL(m);
1282
1283         return 1;
1284 }
1285
1286 static usec_t calc_elapse(uint64_t usec) {
1287         if (usec == (uint64_t) -1)
1288                 return 0;
1289
1290         if (usec == 0)
1291                 usec = BUS_DEFAULT_TIMEOUT;
1292
1293         return now(CLOCK_MONOTONIC) + usec;
1294 }
1295
1296 static int timeout_compare(const void *a, const void *b) {
1297         const struct reply_callback *x = a, *y = b;
1298
1299         if (x->timeout != 0 && y->timeout == 0)
1300                 return -1;
1301
1302         if (x->timeout == 0 && y->timeout != 0)
1303                 return 1;
1304
1305         if (x->timeout < y->timeout)
1306                 return -1;
1307
1308         if (x->timeout > y->timeout)
1309                 return 1;
1310
1311         return 0;
1312 }
1313
1314 int sd_bus_send_with_reply(
1315                 sd_bus *bus,
1316                 sd_bus_message *m,
1317                 sd_bus_message_handler_t callback,
1318                 void *userdata,
1319                 uint64_t usec,
1320                 uint64_t *serial) {
1321
1322         struct reply_callback *c;
1323         int r;
1324
1325         assert_return(bus, -EINVAL);
1326         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1327         assert_return(m, -EINVAL);
1328         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1329         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1330         assert_return(callback, -EINVAL);
1331         assert_return(!bus_pid_changed(bus), -ECHILD);
1332
1333         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1334         if (r < 0)
1335                 return r;
1336
1337         if (usec != (uint64_t) -1) {
1338                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1339                 if (r < 0)
1340                         return r;
1341         }
1342
1343         r = bus_seal_message(bus, m);
1344         if (r < 0)
1345                 return r;
1346
1347         c = new0(struct reply_callback, 1);
1348         if (!c)
1349                 return -ENOMEM;
1350
1351         c->callback = callback;
1352         c->userdata = userdata;
1353         c->serial = BUS_MESSAGE_SERIAL(m);
1354         c->timeout = calc_elapse(usec);
1355
1356         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1357         if (r < 0) {
1358                 free(c);
1359                 return r;
1360         }
1361
1362         if (c->timeout != 0) {
1363                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1364                 if (r < 0) {
1365                         c->timeout = 0;
1366                         sd_bus_send_with_reply_cancel(bus, c->serial);
1367                         return r;
1368                 }
1369         }
1370
1371         r = sd_bus_send(bus, m, serial);
1372         if (r < 0) {
1373                 sd_bus_send_with_reply_cancel(bus, c->serial);
1374                 return r;
1375         }
1376
1377         return r;
1378 }
1379
1380 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1381         struct reply_callback *c;
1382
1383         assert_return(bus, -EINVAL);
1384         assert_return(serial != 0, -EINVAL);
1385         assert_return(!bus_pid_changed(bus), -ECHILD);
1386
1387         c = hashmap_remove(bus->reply_callbacks, &serial);
1388         if (!c)
1389                 return 0;
1390
1391         if (c->timeout != 0)
1392                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1393
1394         free(c);
1395         return 1;
1396 }
1397
1398 int bus_ensure_running(sd_bus *bus) {
1399         int r;
1400
1401         assert(bus);
1402
1403         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1404                 return -ENOTCONN;
1405         if (bus->state == BUS_RUNNING)
1406                 return 1;
1407
1408         for (;;) {
1409                 r = sd_bus_process(bus, NULL);
1410                 if (r < 0)
1411                         return r;
1412                 if (bus->state == BUS_RUNNING)
1413                         return 1;
1414                 if (r > 0)
1415                         continue;
1416
1417                 r = sd_bus_wait(bus, (uint64_t) -1);
1418                 if (r < 0)
1419                         return r;
1420         }
1421 }
1422
1423 int sd_bus_send_with_reply_and_block(
1424                 sd_bus *bus,
1425                 sd_bus_message *m,
1426                 uint64_t usec,
1427                 sd_bus_error *error,
1428                 sd_bus_message **reply) {
1429
1430         int r;
1431         usec_t timeout;
1432         uint64_t serial;
1433         bool room = false;
1434
1435         assert_return(bus, -EINVAL);
1436         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1437         assert_return(m, -EINVAL);
1438         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1439         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1440         assert_return(!bus_error_is_dirty(error), -EINVAL);
1441         assert_return(!bus_pid_changed(bus), -ECHILD);
1442
1443         r = bus_ensure_running(bus);
1444         if (r < 0)
1445                 return r;
1446
1447         r = sd_bus_send(bus, m, &serial);
1448         if (r < 0)
1449                 return r;
1450
1451         timeout = calc_elapse(usec);
1452
1453         for (;;) {
1454                 usec_t left;
1455                 sd_bus_message *incoming = NULL;
1456
1457                 if (!room) {
1458                         sd_bus_message **q;
1459
1460                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1461                                 return -ENOBUFS;
1462
1463                         /* Make sure there's room for queuing this
1464                          * locally, before we read the message */
1465
1466                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1467                         if (!q)
1468                                 return -ENOMEM;
1469
1470                         bus->rqueue = q;
1471                         room = true;
1472                 }
1473
1474                 if (bus->is_kernel)
1475                         r = bus_kernel_read_message(bus, &incoming);
1476                 else
1477                         r = bus_socket_read_message(bus, &incoming);
1478                 if (r < 0)
1479                         return r;
1480                 if (incoming) {
1481
1482                         if (incoming->reply_serial == serial) {
1483                                 /* Found a match! */
1484
1485                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1486
1487                                         if (reply)
1488                                                 *reply = incoming;
1489                                         else
1490                                                 sd_bus_message_unref(incoming);
1491
1492                                         return 1;
1493                                 }
1494
1495                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1496                                         int k;
1497
1498                                         r = sd_bus_error_copy(error, &incoming->error);
1499                                         if (r < 0) {
1500                                                 sd_bus_message_unref(incoming);
1501                                                 return r;
1502                                         }
1503
1504                                         k = sd_bus_error_get_errno(&incoming->error);
1505                                         sd_bus_message_unref(incoming);
1506                                         return -k;
1507                                 }
1508
1509                                 sd_bus_message_unref(incoming);
1510                                 return -EIO;
1511                         }
1512
1513                         /* There's already guaranteed to be room for
1514                          * this, so need to resize things here */
1515                         bus->rqueue[bus->rqueue_size ++] = incoming;
1516                         room = false;
1517
1518                         /* Try to read more, right-away */
1519                         continue;
1520                 }
1521                 if (r != 0)
1522                         continue;
1523
1524                 if (timeout > 0) {
1525                         usec_t n;
1526
1527                         n = now(CLOCK_MONOTONIC);
1528                         if (n >= timeout)
1529                                 return -ETIMEDOUT;
1530
1531                         left = timeout - n;
1532                 } else
1533                         left = (uint64_t) -1;
1534
1535                 r = bus_poll(bus, true, left);
1536                 if (r < 0)
1537                         return r;
1538
1539                 r = dispatch_wqueue(bus);
1540                 if (r < 0)
1541                         return r;
1542         }
1543 }
1544
1545 int sd_bus_get_fd(sd_bus *bus) {
1546
1547         assert_return(bus, -EINVAL);
1548         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1549         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1550         assert_return(!bus_pid_changed(bus), -ECHILD);
1551
1552         return bus->input_fd;
1553 }
1554
1555 int sd_bus_get_events(sd_bus *bus) {
1556         int flags = 0;
1557
1558         assert_return(bus, -EINVAL);
1559         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1560         assert_return(!bus_pid_changed(bus), -ECHILD);
1561
1562         if (bus->state == BUS_OPENING)
1563                 flags |= POLLOUT;
1564         else if (bus->state == BUS_AUTHENTICATING) {
1565
1566                 if (bus_socket_auth_needs_write(bus))
1567                         flags |= POLLOUT;
1568
1569                 flags |= POLLIN;
1570
1571         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1572                 if (bus->rqueue_size <= 0)
1573                         flags |= POLLIN;
1574                 if (bus->wqueue_size > 0)
1575                         flags |= POLLOUT;
1576         }
1577
1578         return flags;
1579 }
1580
1581 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1582         struct reply_callback *c;
1583
1584         assert_return(bus, -EINVAL);
1585         assert_return(timeout_usec, -EINVAL);
1586         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1587         assert_return(!bus_pid_changed(bus), -ECHILD);
1588
1589         if (bus->state == BUS_AUTHENTICATING) {
1590                 *timeout_usec = bus->auth_timeout;
1591                 return 1;
1592         }
1593
1594         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1595                 *timeout_usec = (uint64_t) -1;
1596                 return 0;
1597         }
1598
1599         if (bus->rqueue_size > 0) {
1600                 *timeout_usec = 0;
1601                 return 1;
1602         }
1603
1604         c = prioq_peek(bus->reply_callbacks_prioq);
1605         if (!c) {
1606                 *timeout_usec = (uint64_t) -1;
1607                 return 0;
1608         }
1609
1610         *timeout_usec = c->timeout;
1611         return 1;
1612 }
1613
1614 static int process_timeout(sd_bus *bus) {
1615         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1616         struct reply_callback *c;
1617         usec_t n;
1618         int r;
1619
1620         assert(bus);
1621
1622         c = prioq_peek(bus->reply_callbacks_prioq);
1623         if (!c)
1624                 return 0;
1625
1626         n = now(CLOCK_MONOTONIC);
1627         if (c->timeout > n)
1628                 return 0;
1629
1630         r = bus_message_new_synthetic_error(
1631                         bus,
1632                         c->serial,
1633                         &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1634                         &m);
1635         if (r < 0)
1636                 return r;
1637
1638         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1639         hashmap_remove(bus->reply_callbacks, &c->serial);
1640
1641         r = c->callback(bus, m, c->userdata);
1642         free(c);
1643
1644         return r < 0 ? r : 1;
1645 }
1646
1647 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1648         assert(bus);
1649         assert(m);
1650
1651         if (bus->state != BUS_HELLO)
1652                 return 0;
1653
1654         /* Let's make sure the first message on the bus is the HELLO
1655          * reply. But note that we don't actually parse the message
1656          * here (we leave that to the usual handling), we just verify
1657          * we don't let any earlier msg through. */
1658
1659         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1660             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1661                 return -EIO;
1662
1663         if (m->reply_serial != bus->hello_serial)
1664                 return -EIO;
1665
1666         return 0;
1667 }
1668
1669 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1670         struct reply_callback *c;
1671         int r;
1672
1673         assert(bus);
1674         assert(m);
1675
1676         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1677             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1678                 return 0;
1679
1680         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1681         if (!c)
1682                 return 0;
1683
1684         if (c->timeout != 0)
1685                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1686
1687         r = sd_bus_message_rewind(m, true);
1688         if (r < 0)
1689                 return r;
1690
1691         r = c->callback(bus, m, c->userdata);
1692         free(c);
1693
1694         return r;
1695 }
1696
1697 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1698         struct filter_callback *l;
1699         int r;
1700
1701         assert(bus);
1702         assert(m);
1703
1704         do {
1705                 bus->filter_callbacks_modified = false;
1706
1707                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1708
1709                         if (bus->filter_callbacks_modified)
1710                                 break;
1711
1712                         /* Don't run this more than once per iteration */
1713                         if (l->last_iteration == bus->iteration_counter)
1714                                 continue;
1715
1716                         l->last_iteration = bus->iteration_counter;
1717
1718                         r = sd_bus_message_rewind(m, true);
1719                         if (r < 0)
1720                                 return r;
1721
1722                         r = l->callback(bus, m, l->userdata);
1723                         if (r != 0)
1724                                 return r;
1725
1726                 }
1727
1728         } while (bus->filter_callbacks_modified);
1729
1730         return 0;
1731 }
1732
1733 static int process_match(sd_bus *bus, sd_bus_message *m) {
1734         int r;
1735
1736         assert(bus);
1737         assert(m);
1738
1739         do {
1740                 bus->match_callbacks_modified = false;
1741
1742                 r = bus_match_run(bus, &bus->match_callbacks, m);
1743                 if (r != 0)
1744                         return r;
1745
1746         } while (bus->match_callbacks_modified);
1747
1748         return 0;
1749 }
1750
1751 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1752         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1753         int r;
1754
1755         assert(bus);
1756         assert(m);
1757
1758         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1759                 return 0;
1760
1761         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1762                 return 0;
1763
1764         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1765                 return 1;
1766
1767         if (streq_ptr(m->member, "Ping"))
1768                 r = sd_bus_message_new_method_return(bus, m, &reply);
1769         else if (streq_ptr(m->member, "GetMachineId")) {
1770                 sd_id128_t id;
1771                 char sid[33];
1772
1773                 r = sd_id128_get_machine(&id);
1774                 if (r < 0)
1775                         return r;
1776
1777                 r = sd_bus_message_new_method_return(bus, m, &reply);
1778                 if (r < 0)
1779                         return r;
1780
1781                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1782         } else {
1783                 r = sd_bus_message_new_method_errorf(
1784                                 bus, m, &reply,
1785                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1786                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1787         }
1788
1789         if (r < 0)
1790                 return r;
1791
1792         r = sd_bus_send(bus, reply, NULL);
1793         if (r < 0)
1794                 return r;
1795
1796         return 1;
1797 }
1798
1799 static int process_message(sd_bus *bus, sd_bus_message *m) {
1800         int r;
1801
1802         assert(bus);
1803         assert(m);
1804
1805         bus->iteration_counter++;
1806
1807         log_debug("Got message sender=%s object=%s interface=%s member=%s",
1808                   strna(sd_bus_message_get_sender(m)),
1809                   strna(sd_bus_message_get_path(m)),
1810                   strna(sd_bus_message_get_interface(m)),
1811                   strna(sd_bus_message_get_member(m)));
1812
1813         r = process_hello(bus, m);
1814         if (r != 0)
1815                 return r;
1816
1817         r = process_reply(bus, m);
1818         if (r != 0)
1819                 return r;
1820
1821         r = process_filter(bus, m);
1822         if (r != 0)
1823                 return r;
1824
1825         r = process_match(bus, m);
1826         if (r != 0)
1827                 return r;
1828
1829         r = process_builtin(bus, m);
1830         if (r != 0)
1831                 return r;
1832
1833         return bus_process_object(bus, m);
1834 }
1835
1836 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1837         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1838         int r;
1839
1840         assert(bus);
1841         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1842
1843         r = process_timeout(bus);
1844         if (r != 0)
1845                 goto null_message;
1846
1847         r = dispatch_wqueue(bus);
1848         if (r != 0)
1849                 goto null_message;
1850
1851         r = dispatch_rqueue(bus, &m);
1852         if (r < 0)
1853                 return r;
1854         if (!m)
1855                 goto null_message;
1856
1857         r = process_message(bus, m);
1858         if (r != 0)
1859                 goto null_message;
1860
1861         if (ret) {
1862                 r = sd_bus_message_rewind(m, true);
1863                 if (r < 0)
1864                         return r;
1865
1866                 *ret = m;
1867                 m = NULL;
1868                 return 1;
1869         }
1870
1871         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
1872
1873                 r = sd_bus_reply_method_errorf(
1874                                 bus, m,
1875                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
1876                                 "Unknown object '%s'.", m->path);
1877                 if (r < 0)
1878                         return r;
1879         }
1880
1881         return 1;
1882
1883 null_message:
1884         if (r >= 0 && ret)
1885                 *ret = NULL;
1886
1887         return r;
1888 }
1889
1890 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1891         BUS_DONT_DESTROY(bus);
1892         int r;
1893
1894         /* Returns 0 when we didn't do anything. This should cause the
1895          * caller to invoke sd_bus_wait() before returning the next
1896          * time. Returns > 0 when we did something, which possibly
1897          * means *ret is filled in with an unprocessed message. */
1898
1899         assert_return(bus, -EINVAL);
1900         assert_return(!bus_pid_changed(bus), -ECHILD);
1901
1902         /* We don't allow recursively invoking sd_bus_process(). */
1903         assert_return(!bus->processing, -EBUSY);
1904
1905         switch (bus->state) {
1906
1907         case BUS_UNSET:
1908         case BUS_CLOSED:
1909                 return -ENOTCONN;
1910
1911         case BUS_OPENING:
1912                 r = bus_socket_process_opening(bus);
1913                 if (r < 0)
1914                         return r;
1915                 if (ret)
1916                         *ret = NULL;
1917                 return r;
1918
1919         case BUS_AUTHENTICATING:
1920
1921                 r = bus_socket_process_authenticating(bus);
1922                 if (r < 0)
1923                         return r;
1924                 if (ret)
1925                         *ret = NULL;
1926                 return r;
1927
1928         case BUS_RUNNING:
1929         case BUS_HELLO:
1930
1931                 bus->processing = true;
1932                 r = process_running(bus, ret);
1933                 bus->processing = false;
1934
1935                 return r;
1936         }
1937
1938         assert_not_reached("Unknown state");
1939 }
1940
1941 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1942         struct pollfd p[2] = {};
1943         int r, e, n;
1944         struct timespec ts;
1945         usec_t m = (usec_t) -1;
1946
1947         assert(bus);
1948         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1949
1950         e = sd_bus_get_events(bus);
1951         if (e < 0)
1952                 return e;
1953
1954         if (need_more)
1955                 /* The caller really needs some more data, he doesn't
1956                  * care about what's already read, or any timeouts
1957                  * except its own.*/
1958                 e |= POLLIN;
1959         else {
1960                 usec_t until;
1961                 /* The caller wants to process if there's something to
1962                  * process, but doesn't care otherwise */
1963
1964                 r = sd_bus_get_timeout(bus, &until);
1965                 if (r < 0)
1966                         return r;
1967                 if (r > 0) {
1968                         usec_t nw;
1969                         nw = now(CLOCK_MONOTONIC);
1970                         m = until > nw ? until - nw : 0;
1971                 }
1972         }
1973
1974         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1975                 m = timeout_usec;
1976
1977         p[0].fd = bus->input_fd;
1978         if (bus->output_fd == bus->input_fd) {
1979                 p[0].events = e;
1980                 n = 1;
1981         } else {
1982                 p[0].events = e & POLLIN;
1983                 p[1].fd = bus->output_fd;
1984                 p[1].events = e & POLLOUT;
1985                 n = 2;
1986         }
1987
1988         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1989         if (r < 0)
1990                 return -errno;
1991
1992         return r > 0 ? 1 : 0;
1993 }
1994
1995 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1996
1997         assert_return(bus, -EINVAL);
1998         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1999         assert_return(!bus_pid_changed(bus), -ECHILD);
2000
2001         if (bus->rqueue_size > 0)
2002                 return 0;
2003
2004         return bus_poll(bus, false, timeout_usec);
2005 }
2006
2007 int sd_bus_flush(sd_bus *bus) {
2008         int r;
2009
2010         assert_return(bus, -EINVAL);
2011         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2012         assert_return(!bus_pid_changed(bus), -ECHILD);
2013
2014         r = bus_ensure_running(bus);
2015         if (r < 0)
2016                 return r;
2017
2018         if (bus->wqueue_size <= 0)
2019                 return 0;
2020
2021         for (;;) {
2022                 r = dispatch_wqueue(bus);
2023                 if (r < 0)
2024                         return r;
2025
2026                 if (bus->wqueue_size <= 0)
2027                         return 0;
2028
2029                 r = bus_poll(bus, false, (uint64_t) -1);
2030                 if (r < 0)
2031                         return r;
2032         }
2033 }
2034
2035 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2036         struct filter_callback *f;
2037
2038         assert_return(bus, -EINVAL);
2039         assert_return(callback, -EINVAL);
2040         assert_return(!bus_pid_changed(bus), -ECHILD);
2041
2042         f = new0(struct filter_callback, 1);
2043         if (!f)
2044                 return -ENOMEM;
2045         f->callback = callback;
2046         f->userdata = userdata;
2047
2048         bus->filter_callbacks_modified = true;
2049         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2050         return 0;
2051 }
2052
2053 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2054         struct filter_callback *f;
2055
2056         assert_return(bus, -EINVAL);
2057         assert_return(callback, -EINVAL);
2058         assert_return(!bus_pid_changed(bus), -ECHILD);
2059
2060         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2061                 if (f->callback == callback && f->userdata == userdata) {
2062                         bus->filter_callbacks_modified = true;
2063                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2064                         free(f);
2065                         return 1;
2066                 }
2067         }
2068
2069         return 0;
2070 }
2071
2072 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2073         struct bus_match_component *components = NULL;
2074         unsigned n_components = 0;
2075         uint64_t cookie = 0;
2076         int r = 0;
2077
2078         assert_return(bus, -EINVAL);
2079         assert_return(match, -EINVAL);
2080         assert_return(!bus_pid_changed(bus), -ECHILD);
2081
2082         r = bus_match_parse(match, &components, &n_components);
2083         if (r < 0)
2084                 goto finish;
2085
2086         if (bus->bus_client) {
2087                 cookie = ++bus->match_cookie;
2088
2089                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2090                 if (r < 0)
2091                         goto finish;
2092         }
2093
2094         bus->match_callbacks_modified = true;
2095         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2096         if (r < 0) {
2097                 if (bus->bus_client)
2098                         bus_remove_match_internal(bus, match, cookie);
2099         }
2100
2101 finish:
2102         bus_match_parse_free(components, n_components);
2103         return r;
2104 }
2105
2106 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2107         struct bus_match_component *components = NULL;
2108         unsigned n_components = 0;
2109         int r = 0, q = 0;
2110         uint64_t cookie = 0;
2111
2112         assert_return(bus, -EINVAL);
2113         assert_return(match, -EINVAL);
2114         assert_return(!bus_pid_changed(bus), -ECHILD);
2115
2116         r = bus_match_parse(match, &components, &n_components);
2117         if (r < 0)
2118                 return r;
2119
2120         bus->match_callbacks_modified = true;
2121         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2122
2123         if (bus->bus_client)
2124                 q = bus_remove_match_internal(bus, match, cookie);
2125
2126         bus_match_parse_free(components, n_components);
2127
2128         return r < 0 ? r : q;
2129 }
2130
2131 bool bus_pid_changed(sd_bus *bus) {
2132         assert(bus);
2133
2134         /* We don't support people creating a bus connection and
2135          * keeping it around over a fork(). Let's complain. */
2136
2137         return bus->original_pid != getpid();
2138 }
2139
2140 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2141         void *bus = userdata;
2142         int r;
2143
2144         assert(bus);
2145
2146         r = sd_bus_process(bus, NULL);
2147         if (r < 0)
2148                 return r;
2149
2150         return 1;
2151 }
2152
2153 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2154         void *bus = userdata;
2155         int r;
2156
2157         assert(bus);
2158
2159         r = sd_bus_process(bus, NULL);
2160         if (r < 0)
2161                 return r;
2162
2163         return 1;
2164 }
2165
2166 static int prepare_callback(sd_event_source *s, void *userdata) {
2167         sd_bus *bus = userdata;
2168         int r, e;
2169         usec_t until;
2170
2171         assert(s);
2172         assert(bus);
2173
2174         e = sd_bus_get_events(bus);
2175         if (e < 0)
2176                 return e;
2177
2178         if (bus->output_fd != bus->input_fd) {
2179
2180                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2181                 if (r < 0)
2182                         return r;
2183
2184                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2185                 if (r < 0)
2186                         return r;
2187         } else {
2188                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2189                 if (r < 0)
2190                         return r;
2191         }
2192
2193         r = sd_bus_get_timeout(bus, &until);
2194         if (r < 0)
2195                 return r;
2196         if (r > 0) {
2197                 int j;
2198
2199                 j = sd_event_source_set_time(bus->time_event_source, until);
2200                 if (j < 0)
2201                         return j;
2202         }
2203
2204         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2205         if (r < 0)
2206                 return r;
2207
2208         return 1;
2209 }
2210
2211 static int quit_callback(sd_event_source *event, void *userdata) {
2212         sd_bus *bus = userdata;
2213
2214         assert(event);
2215
2216         sd_bus_flush(bus);
2217
2218         return 1;
2219 }
2220
2221 int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2222         int r;
2223
2224         assert_return(bus, -EINVAL);
2225         assert_return(event, -EINVAL);
2226         assert_return(!bus->event, -EBUSY);
2227
2228         assert(!bus->input_io_event_source);
2229         assert(!bus->output_io_event_source);
2230         assert(!bus->time_event_source);
2231
2232         bus->event = sd_event_ref(event);
2233
2234         r = sd_event_add_io(event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2235         if (r < 0)
2236                 goto fail;
2237
2238         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2239         if (r < 0)
2240                 goto fail;
2241
2242         if (bus->output_fd != bus->input_fd) {
2243                 r = sd_event_add_io(event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2244                 if (r < 0)
2245                         goto fail;
2246
2247                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2248                 if (r < 0)
2249                         goto fail;
2250         }
2251
2252         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2253         if (r < 0)
2254                 goto fail;
2255
2256         r = sd_event_add_monotonic(event, 0, 0, time_callback, bus, &bus->time_event_source);
2257         if (r < 0)
2258                 goto fail;
2259
2260         r = sd_event_source_set_priority(bus->time_event_source, priority);
2261         if (r < 0)
2262                 goto fail;
2263
2264         r = sd_event_add_quit(event, quit_callback, bus, &bus->quit_event_source);
2265         if (r < 0)
2266                 goto fail;
2267
2268         return 0;
2269
2270 fail:
2271         sd_bus_detach_event(bus);
2272         return r;
2273 }
2274
2275 int sd_bus_detach_event(sd_bus *bus) {
2276         assert_return(bus, -EINVAL);
2277         assert_return(bus->event, -ENXIO);
2278
2279         if (bus->input_io_event_source)
2280                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2281
2282         if (bus->output_io_event_source)
2283                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2284
2285         if (bus->time_event_source)
2286                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2287
2288         if (bus->quit_event_source)
2289                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2290
2291         if (bus->event)
2292                 bus->event = sd_event_unref(bus->event);
2293
2294         return 0;
2295 }