chiark / gitweb /
bus: make sure sd_bus_get_timeout() returns a 0 timeout of there are already read...
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48
49 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
50
51 static void bus_close_fds(sd_bus *b) {
52         assert(b);
53
54         if (b->input_fd >= 0)
55                 close_nointr_nofail(b->input_fd);
56
57         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
58                 close_nointr_nofail(b->output_fd);
59
60         b->input_fd = b->output_fd = -1;
61 }
62
63 static void bus_node_destroy(sd_bus *b, struct node *n) {
64         struct node_callback *c;
65         struct node_vtable *v;
66         struct node_enumerator *e;
67
68         assert(b);
69
70         if (!n)
71                 return;
72
73         while (n->child)
74                 bus_node_destroy(b, n->child);
75
76         while ((c = n->callbacks)) {
77                 LIST_REMOVE(callbacks, n->callbacks, c);
78                 free(c);
79         }
80
81         while ((v = n->vtables)) {
82                 LIST_REMOVE(vtables, n->vtables, v);
83                 free(v->interface);
84                 free(v);
85         }
86
87         while ((e = n->enumerators)) {
88                 LIST_REMOVE(enumerators, n->enumerators, e);
89                 free(e);
90         }
91
92         if (n->parent)
93                 LIST_REMOVE(siblings, n->parent->child, n);
94
95         assert_se(hashmap_remove(b->nodes, n->path) == n);
96         free(n->path);
97         free(n);
98 }
99
100 static void bus_free(sd_bus *b) {
101         struct filter_callback *f;
102         struct node *n;
103         unsigned i;
104
105         assert(b);
106
107         bus_close_fds(b);
108
109         if (b->kdbus_buffer)
110                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
111
112         free(b->rbuffer);
113         free(b->unique_name);
114         free(b->auth_buffer);
115         free(b->address);
116         free(b->kernel);
117
118         free(b->exec_path);
119         strv_free(b->exec_argv);
120
121         close_many(b->fds, b->n_fds);
122         free(b->fds);
123
124         for (i = 0; i < b->rqueue_size; i++)
125                 sd_bus_message_unref(b->rqueue[i]);
126         free(b->rqueue);
127
128         for (i = 0; i < b->wqueue_size; i++)
129                 sd_bus_message_unref(b->wqueue[i]);
130         free(b->wqueue);
131
132         hashmap_free_free(b->reply_callbacks);
133         prioq_free(b->reply_callbacks_prioq);
134
135         while ((f = b->filter_callbacks)) {
136                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
137                 free(f);
138         }
139
140         bus_match_free(&b->match_callbacks);
141
142         hashmap_free_free(b->vtable_methods);
143         hashmap_free_free(b->vtable_properties);
144
145         while ((n = hashmap_first(b->nodes)))
146                 bus_node_destroy(b, n);
147
148         hashmap_free(b->nodes);
149
150         bus_kernel_flush_memfd(b);
151
152         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
153
154         free(b);
155 }
156
157 int sd_bus_new(sd_bus **ret) {
158         sd_bus *r;
159
160         assert_return(ret, -EINVAL);
161
162         r = new0(sd_bus, 1);
163         if (!r)
164                 return -ENOMEM;
165
166         r->n_ref = REFCNT_INIT;
167         r->input_fd = r->output_fd = -1;
168         r->message_version = 1;
169         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
170         r->original_pid = getpid();
171
172         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
173
174         /* We guarantee that wqueue always has space for at least one
175          * entry */
176         r->wqueue = new(sd_bus_message*, 1);
177         if (!r->wqueue) {
178                 free(r);
179                 return -ENOMEM;
180         }
181
182         *ret = r;
183         return 0;
184 }
185
186 int sd_bus_set_address(sd_bus *bus, const char *address) {
187         char *a;
188
189         assert_return(bus, -EINVAL);
190         assert_return(bus->state == BUS_UNSET, -EPERM);
191         assert_return(address, -EINVAL);
192         assert_return(!bus_pid_changed(bus), -ECHILD);
193
194         a = strdup(address);
195         if (!a)
196                 return -ENOMEM;
197
198         free(bus->address);
199         bus->address = a;
200
201         return 0;
202 }
203
204 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
205         assert_return(bus, -EINVAL);
206         assert_return(bus->state == BUS_UNSET, -EPERM);
207         assert_return(input_fd >= 0, -EINVAL);
208         assert_return(output_fd >= 0, -EINVAL);
209         assert_return(!bus_pid_changed(bus), -ECHILD);
210
211         bus->input_fd = input_fd;
212         bus->output_fd = output_fd;
213         return 0;
214 }
215
216 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
217         char *p, **a;
218
219         assert_return(bus, -EINVAL);
220         assert_return(bus->state == BUS_UNSET, -EPERM);
221         assert_return(path, -EINVAL);
222         assert_return(!strv_isempty(argv), -EINVAL);
223         assert_return(!bus_pid_changed(bus), -ECHILD);
224
225         p = strdup(path);
226         if (!p)
227                 return -ENOMEM;
228
229         a = strv_copy(argv);
230         if (!a) {
231                 free(p);
232                 return -ENOMEM;
233         }
234
235         free(bus->exec_path);
236         strv_free(bus->exec_argv);
237
238         bus->exec_path = p;
239         bus->exec_argv = a;
240
241         return 0;
242 }
243
244 int sd_bus_set_bus_client(sd_bus *bus, int b) {
245         assert_return(bus, -EINVAL);
246         assert_return(bus->state == BUS_UNSET, -EPERM);
247         assert_return(!bus_pid_changed(bus), -ECHILD);
248
249         bus->bus_client = !!b;
250         return 0;
251 }
252
253 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
254         assert_return(bus, -EINVAL);
255         assert_return(bus->state == BUS_UNSET, -EPERM);
256         assert_return(!bus_pid_changed(bus), -ECHILD);
257
258         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
259         return 0;
260 }
261
262 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
263         assert_return(bus, -EINVAL);
264         assert_return(bus->state == BUS_UNSET, -EPERM);
265         assert_return(!bus_pid_changed(bus), -ECHILD);
266
267         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
268         return 0;
269 }
270
271 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
272         assert_return(bus, -EINVAL);
273         assert_return(bus->state == BUS_UNSET, -EPERM);
274         assert_return(!bus_pid_changed(bus), -ECHILD);
275
276         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
277         return 0;
278 }
279
280 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
281         assert_return(bus, -EINVAL);
282         assert_return(bus->state == BUS_UNSET, -EPERM);
283         assert_return(!bus_pid_changed(bus), -ECHILD);
284
285         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
286         return 0;
287 }
288
289 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
290         assert_return(bus, -EINVAL);
291         assert_return(bus->state == BUS_UNSET, -EPERM);
292         assert_return(!bus_pid_changed(bus), -ECHILD);
293
294         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
295         return 0;
296 }
297
298 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
299         assert_return(bus, -EINVAL);
300         assert_return(bus->state == BUS_UNSET, -EPERM);
301         assert_return(!bus_pid_changed(bus), -ECHILD);
302
303         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
304         return 0;
305 }
306
307 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
308         assert_return(bus, -EINVAL);
309         assert_return(bus->state == BUS_UNSET, -EPERM);
310         assert_return(!bus_pid_changed(bus), -ECHILD);
311
312         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
313         return 0;
314 }
315
316 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
317         assert_return(bus, -EINVAL);
318         assert_return(bus->state == BUS_UNSET, -EPERM);
319         assert_return(!bus_pid_changed(bus), -ECHILD);
320
321         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
322         return 0;
323 }
324
325 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
326         assert_return(bus, -EINVAL);
327         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
328         assert_return(bus->state == BUS_UNSET, -EPERM);
329         assert_return(!bus_pid_changed(bus), -ECHILD);
330
331         bus->is_server = !!b;
332         bus->server_id = server_id;
333         return 0;
334 }
335
336 int sd_bus_set_anonymous(sd_bus *bus, int b) {
337         assert_return(bus, -EINVAL);
338         assert_return(bus->state == BUS_UNSET, -EPERM);
339         assert_return(!bus_pid_changed(bus), -ECHILD);
340
341         bus->anonymous_auth = !!b;
342         return 0;
343 }
344
345 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
346         const char *s;
347         int r;
348
349         assert(bus);
350         assert(bus->state == BUS_HELLO);
351         assert(reply);
352
353         r = bus_message_to_errno(reply);
354         if (r < 0)
355                 return r;
356
357         r = sd_bus_message_read(reply, "s", &s);
358         if (r < 0)
359                 return r;
360
361         if (!service_name_is_valid(s) || s[0] != ':')
362                 return -EBADMSG;
363
364         bus->unique_name = strdup(s);
365         if (!bus->unique_name)
366                 return -ENOMEM;
367
368         bus->state = BUS_RUNNING;
369
370         return 1;
371 }
372
373 static int bus_send_hello(sd_bus *bus) {
374         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
375         int r;
376
377         assert(bus);
378
379         if (!bus->bus_client || bus->is_kernel)
380                 return 0;
381
382         r = sd_bus_message_new_method_call(
383                         bus,
384                         "org.freedesktop.DBus",
385                         "/",
386                         "org.freedesktop.DBus",
387                         "Hello",
388                         &m);
389         if (r < 0)
390                 return r;
391
392         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
393 }
394
395 int bus_start_running(sd_bus *bus) {
396         assert(bus);
397
398         if (bus->bus_client && !bus->is_kernel) {
399                 bus->state = BUS_HELLO;
400                 return 1;
401         }
402
403         bus->state = BUS_RUNNING;
404         return 1;
405 }
406
407 static int parse_address_key(const char **p, const char *key, char **value) {
408         size_t l, n = 0;
409         const char *a;
410         char *r = NULL;
411
412         assert(p);
413         assert(*p);
414         assert(value);
415
416         if (key) {
417                 l = strlen(key);
418                 if (strncmp(*p, key, l) != 0)
419                         return 0;
420
421                 if ((*p)[l] != '=')
422                         return 0;
423
424                 if (*value)
425                         return -EINVAL;
426
427                 a = *p + l + 1;
428         } else
429                 a = *p;
430
431         while (*a != ';' && *a != ',' && *a != 0) {
432                 char c, *t;
433
434                 if (*a == '%') {
435                         int x, y;
436
437                         x = unhexchar(a[1]);
438                         if (x < 0) {
439                                 free(r);
440                                 return x;
441                         }
442
443                         y = unhexchar(a[2]);
444                         if (y < 0) {
445                                 free(r);
446                                 return y;
447                         }
448
449                         c = (char) ((x << 4) | y);
450                         a += 3;
451                 } else {
452                         c = *a;
453                         a++;
454                 }
455
456                 t = realloc(r, n + 2);
457                 if (!t) {
458                         free(r);
459                         return -ENOMEM;
460                 }
461
462                 r = t;
463                 r[n++] = c;
464         }
465
466         if (!r) {
467                 r = strdup("");
468                 if (!r)
469                         return -ENOMEM;
470         } else
471                 r[n] = 0;
472
473         if (*a == ',')
474                 a++;
475
476         *p = a;
477
478         free(*value);
479         *value = r;
480
481         return 1;
482 }
483
484 static void skip_address_key(const char **p) {
485         assert(p);
486         assert(*p);
487
488         *p += strcspn(*p, ",");
489
490         if (**p == ',')
491                 (*p) ++;
492 }
493
494 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
495         _cleanup_free_ char *path = NULL, *abstract = NULL;
496         size_t l;
497         int r;
498
499         assert(b);
500         assert(p);
501         assert(*p);
502         assert(guid);
503
504         while (**p != 0 && **p != ';') {
505                 r = parse_address_key(p, "guid", guid);
506                 if (r < 0)
507                         return r;
508                 else if (r > 0)
509                         continue;
510
511                 r = parse_address_key(p, "path", &path);
512                 if (r < 0)
513                         return r;
514                 else if (r > 0)
515                         continue;
516
517                 r = parse_address_key(p, "abstract", &abstract);
518                 if (r < 0)
519                         return r;
520                 else if (r > 0)
521                         continue;
522
523                 skip_address_key(p);
524         }
525
526         if (!path && !abstract)
527                 return -EINVAL;
528
529         if (path && abstract)
530                 return -EINVAL;
531
532         if (path) {
533                 l = strlen(path);
534                 if (l > sizeof(b->sockaddr.un.sun_path))
535                         return -E2BIG;
536
537                 b->sockaddr.un.sun_family = AF_UNIX;
538                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
539                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
540         } else if (abstract) {
541                 l = strlen(abstract);
542                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
543                         return -E2BIG;
544
545                 b->sockaddr.un.sun_family = AF_UNIX;
546                 b->sockaddr.un.sun_path[0] = 0;
547                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
548                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
549         }
550
551         return 0;
552 }
553
554 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
555         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
556         int r;
557         struct addrinfo *result, hints = {
558                 .ai_socktype = SOCK_STREAM,
559                 .ai_flags = AI_ADDRCONFIG,
560         };
561
562         assert(b);
563         assert(p);
564         assert(*p);
565         assert(guid);
566
567         while (**p != 0 && **p != ';') {
568                 r = parse_address_key(p, "guid", guid);
569                 if (r < 0)
570                         return r;
571                 else if (r > 0)
572                         continue;
573
574                 r = parse_address_key(p, "host", &host);
575                 if (r < 0)
576                         return r;
577                 else if (r > 0)
578                         continue;
579
580                 r = parse_address_key(p, "port", &port);
581                 if (r < 0)
582                         return r;
583                 else if (r > 0)
584                         continue;
585
586                 r = parse_address_key(p, "family", &family);
587                 if (r < 0)
588                         return r;
589                 else if (r > 0)
590                         continue;
591
592                 skip_address_key(p);
593         }
594
595         if (!host || !port)
596                 return -EINVAL;
597
598         if (family) {
599                 if (streq(family, "ipv4"))
600                         hints.ai_family = AF_INET;
601                 else if (streq(family, "ipv6"))
602                         hints.ai_family = AF_INET6;
603                 else
604                         return -EINVAL;
605         }
606
607         r = getaddrinfo(host, port, &hints, &result);
608         if (r == EAI_SYSTEM)
609                 return -errno;
610         else if (r != 0)
611                 return -EADDRNOTAVAIL;
612
613         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
614         b->sockaddr_size = result->ai_addrlen;
615
616         freeaddrinfo(result);
617
618         return 0;
619 }
620
621 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
622         char *path = NULL;
623         unsigned n_argv = 0, j;
624         char **argv = NULL;
625         int r;
626
627         assert(b);
628         assert(p);
629         assert(*p);
630         assert(guid);
631
632         while (**p != 0 && **p != ';') {
633                 r = parse_address_key(p, "guid", guid);
634                 if (r < 0)
635                         goto fail;
636                 else if (r > 0)
637                         continue;
638
639                 r = parse_address_key(p, "path", &path);
640                 if (r < 0)
641                         goto fail;
642                 else if (r > 0)
643                         continue;
644
645                 if (startswith(*p, "argv")) {
646                         unsigned ul;
647
648                         errno = 0;
649                         ul = strtoul(*p + 4, (char**) p, 10);
650                         if (errno > 0 || **p != '=' || ul > 256) {
651                                 r = -EINVAL;
652                                 goto fail;
653                         }
654
655                         (*p) ++;
656
657                         if (ul >= n_argv) {
658                                 char **x;
659
660                                 x = realloc(argv, sizeof(char*) * (ul + 2));
661                                 if (!x) {
662                                         r = -ENOMEM;
663                                         goto fail;
664                                 }
665
666                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
667
668                                 argv = x;
669                                 n_argv = ul + 1;
670                         }
671
672                         r = parse_address_key(p, NULL, argv + ul);
673                         if (r < 0)
674                                 goto fail;
675
676                         continue;
677                 }
678
679                 skip_address_key(p);
680         }
681
682         if (!path) {
683                 r = -EINVAL;
684                 goto fail;
685         }
686
687         /* Make sure there are no holes in the array, with the
688          * exception of argv[0] */
689         for (j = 1; j < n_argv; j++)
690                 if (!argv[j]) {
691                         r = -EINVAL;
692                         goto fail;
693                 }
694
695         if (argv && argv[0] == NULL) {
696                 argv[0] = strdup(path);
697                 if (!argv[0]) {
698                         r = -ENOMEM;
699                         goto fail;
700                 }
701         }
702
703         b->exec_path = path;
704         b->exec_argv = argv;
705         return 0;
706
707 fail:
708         for (j = 0; j < n_argv; j++)
709                 free(argv[j]);
710
711         free(argv);
712         free(path);
713         return r;
714 }
715
716 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
717         _cleanup_free_ char *path = NULL;
718         int r;
719
720         assert(b);
721         assert(p);
722         assert(*p);
723         assert(guid);
724
725         while (**p != 0 && **p != ';') {
726                 r = parse_address_key(p, "guid", guid);
727                 if (r < 0)
728                         return r;
729                 else if (r > 0)
730                         continue;
731
732                 r = parse_address_key(p, "path", &path);
733                 if (r < 0)
734                         return r;
735                 else if (r > 0)
736                         continue;
737
738                 skip_address_key(p);
739         }
740
741         if (!path)
742                 return -EINVAL;
743
744         free(b->kernel);
745         b->kernel = path;
746         path = NULL;
747
748         return 0;
749 }
750
751 static void bus_reset_parsed_address(sd_bus *b) {
752         assert(b);
753
754         zero(b->sockaddr);
755         b->sockaddr_size = 0;
756         strv_free(b->exec_argv);
757         free(b->exec_path);
758         b->exec_path = NULL;
759         b->exec_argv = NULL;
760         b->server_id = SD_ID128_NULL;
761         free(b->kernel);
762         b->kernel = NULL;
763 }
764
765 static int bus_parse_next_address(sd_bus *b) {
766         _cleanup_free_ char *guid = NULL;
767         const char *a;
768         int r;
769
770         assert(b);
771
772         if (!b->address)
773                 return 0;
774         if (b->address[b->address_index] == 0)
775                 return 0;
776
777         bus_reset_parsed_address(b);
778
779         a = b->address + b->address_index;
780
781         while (*a != 0) {
782
783                 if (*a == ';') {
784                         a++;
785                         continue;
786                 }
787
788                 if (startswith(a, "unix:")) {
789                         a += 5;
790
791                         r = parse_unix_address(b, &a, &guid);
792                         if (r < 0)
793                                 return r;
794                         break;
795
796                 } else if (startswith(a, "tcp:")) {
797
798                         a += 4;
799                         r = parse_tcp_address(b, &a, &guid);
800                         if (r < 0)
801                                 return r;
802
803                         break;
804
805                 } else if (startswith(a, "unixexec:")) {
806
807                         a += 9;
808                         r = parse_exec_address(b, &a, &guid);
809                         if (r < 0)
810                                 return r;
811
812                         break;
813
814                 } else if (startswith(a, "kernel:")) {
815
816                         a += 7;
817                         r = parse_kernel_address(b, &a, &guid);
818                         if (r < 0)
819                                 return r;
820
821                         break;
822                 }
823
824                 a = strchr(a, ';');
825                 if (!a)
826                         return 0;
827         }
828
829         if (guid) {
830                 r = sd_id128_from_string(guid, &b->server_id);
831                 if (r < 0)
832                         return r;
833         }
834
835         b->address_index = a - b->address;
836         return 1;
837 }
838
839 static int bus_start_address(sd_bus *b) {
840         int r;
841
842         assert(b);
843
844         for (;;) {
845                 sd_bus_close(b);
846
847                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
848
849                         r = bus_socket_connect(b);
850                         if (r >= 0)
851                                 return r;
852
853                         b->last_connect_error = -r;
854
855                 } else if (b->exec_path) {
856
857                         r = bus_socket_exec(b);
858                         if (r >= 0)
859                                 return r;
860
861                         b->last_connect_error = -r;
862                 } else if (b->kernel) {
863
864                         r = bus_kernel_connect(b);
865                         if (r >= 0)
866                                 return r;
867
868                         b->last_connect_error = -r;
869                 }
870
871                 r = bus_parse_next_address(b);
872                 if (r < 0)
873                         return r;
874                 if (r == 0)
875                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
876         }
877 }
878
879 int bus_next_address(sd_bus *b) {
880         assert(b);
881
882         bus_reset_parsed_address(b);
883         return bus_start_address(b);
884 }
885
886 static int bus_start_fd(sd_bus *b) {
887         struct stat st;
888         int r;
889
890         assert(b);
891         assert(b->input_fd >= 0);
892         assert(b->output_fd >= 0);
893
894         r = fd_nonblock(b->input_fd, true);
895         if (r < 0)
896                 return r;
897
898         r = fd_cloexec(b->input_fd, true);
899         if (r < 0)
900                 return r;
901
902         if (b->input_fd != b->output_fd) {
903                 r = fd_nonblock(b->output_fd, true);
904                 if (r < 0)
905                         return r;
906
907                 r = fd_cloexec(b->output_fd, true);
908                 if (r < 0)
909                         return r;
910         }
911
912         if (fstat(b->input_fd, &st) < 0)
913                 return -errno;
914
915         if (S_ISCHR(b->input_fd))
916                 return bus_kernel_take_fd(b);
917         else
918                 return bus_socket_take_fd(b);
919 }
920
921 int sd_bus_start(sd_bus *bus) {
922         int r;
923
924         assert_return(bus, -EINVAL);
925         assert_return(bus->state == BUS_UNSET, -EPERM);
926         assert_return(!bus_pid_changed(bus), -ECHILD);
927
928         bus->state = BUS_OPENING;
929
930         if (bus->is_server && bus->bus_client)
931                 return -EINVAL;
932
933         if (bus->input_fd >= 0)
934                 r = bus_start_fd(bus);
935         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
936                 r = bus_start_address(bus);
937         else
938                 return -EINVAL;
939
940         if (r < 0)
941                 return r;
942
943         return bus_send_hello(bus);
944 }
945
946 int sd_bus_open_system(sd_bus **ret) {
947         const char *e;
948         sd_bus *b;
949         int r;
950
951         assert_return(ret, -EINVAL);
952
953         r = sd_bus_new(&b);
954         if (r < 0)
955                 return r;
956
957         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
958         if (e) {
959                 r = sd_bus_set_address(b, e);
960                 if (r < 0)
961                         goto fail;
962         } else {
963                 b->sockaddr.un.sun_family = AF_UNIX;
964                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
965                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
966         }
967
968         b->bus_client = true;
969
970         r = sd_bus_start(b);
971         if (r < 0)
972                 goto fail;
973
974         *ret = b;
975         return 0;
976
977 fail:
978         bus_free(b);
979         return r;
980 }
981
982 int sd_bus_open_user(sd_bus **ret) {
983         const char *e;
984         sd_bus *b;
985         size_t l;
986         int r;
987
988         assert_return(ret, -EINVAL);
989
990         r = sd_bus_new(&b);
991         if (r < 0)
992                 return r;
993
994         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
995         if (e) {
996                 r = sd_bus_set_address(b, e);
997                 if (r < 0)
998                         goto fail;
999         } else {
1000                 e = secure_getenv("XDG_RUNTIME_DIR");
1001                 if (!e) {
1002                         r = -ENOENT;
1003                         goto fail;
1004                 }
1005
1006                 l = strlen(e);
1007                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1008                         r = -E2BIG;
1009                         goto fail;
1010                 }
1011
1012                 b->sockaddr.un.sun_family = AF_UNIX;
1013                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1014                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1015         }
1016
1017         b->bus_client = true;
1018
1019         r = sd_bus_start(b);
1020         if (r < 0)
1021                 goto fail;
1022
1023         *ret = b;
1024         return 0;
1025
1026 fail:
1027         bus_free(b);
1028         return r;
1029 }
1030
1031 void sd_bus_close(sd_bus *bus) {
1032         if (!bus)
1033                 return;
1034         if (bus->state == BUS_CLOSED)
1035                 return;
1036         if (bus_pid_changed(bus))
1037                 return;
1038
1039         bus->state = BUS_CLOSED;
1040
1041         if (!bus->is_kernel)
1042                 bus_close_fds(bus);
1043
1044         /* We'll leave the fd open in case this is a kernel bus, since
1045          * there might still be memblocks around that reference this
1046          * bus, and they might need to invoke the
1047          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1048          * freed. */
1049 }
1050
1051 sd_bus *sd_bus_ref(sd_bus *bus) {
1052         if (!bus)
1053                 return NULL;
1054
1055         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1056
1057         return bus;
1058 }
1059
1060 sd_bus *sd_bus_unref(sd_bus *bus) {
1061         if (!bus)
1062                 return NULL;
1063
1064         if (REFCNT_DEC(bus->n_ref) <= 0)
1065                 bus_free(bus);
1066
1067         return NULL;
1068 }
1069
1070 int sd_bus_is_open(sd_bus *bus) {
1071
1072         assert_return(bus, -EINVAL);
1073         assert_return(!bus_pid_changed(bus), -ECHILD);
1074
1075         return BUS_IS_OPEN(bus->state);
1076 }
1077
1078 int sd_bus_can_send(sd_bus *bus, char type) {
1079         int r;
1080
1081         assert_return(bus, -EINVAL);
1082         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1083         assert_return(!bus_pid_changed(bus), -ECHILD);
1084
1085         if (type == SD_BUS_TYPE_UNIX_FD) {
1086                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1087                         return 0;
1088
1089                 r = bus_ensure_running(bus);
1090                 if (r < 0)
1091                         return r;
1092
1093                 return bus->can_fds;
1094         }
1095
1096         return bus_type_is_valid(type);
1097 }
1098
1099 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1100         int r;
1101
1102         assert_return(bus, -EINVAL);
1103         assert_return(server_id, -EINVAL);
1104         assert_return(!bus_pid_changed(bus), -ECHILD);
1105
1106         r = bus_ensure_running(bus);
1107         if (r < 0)
1108                 return r;
1109
1110         *server_id = bus->server_id;
1111         return 0;
1112 }
1113
1114 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1115         assert(m);
1116
1117         if (m->header->version > b->message_version)
1118                 return -EPERM;
1119
1120         if (m->sealed)
1121                 return 0;
1122
1123         return bus_message_seal(m, ++b->serial);
1124 }
1125
1126 static int dispatch_wqueue(sd_bus *bus) {
1127         int r, ret = 0;
1128
1129         assert(bus);
1130         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1131
1132         while (bus->wqueue_size > 0) {
1133
1134                 if (bus->is_kernel)
1135                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1136                 else
1137                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1138
1139                 if (r < 0) {
1140                         sd_bus_close(bus);
1141                         return r;
1142                 } else if (r == 0)
1143                         /* Didn't do anything this time */
1144                         return ret;
1145                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1146                         /* Fully written. Let's drop the entry from
1147                          * the queue.
1148                          *
1149                          * This isn't particularly optimized, but
1150                          * well, this is supposed to be our worst-case
1151                          * buffer only, and the socket buffer is
1152                          * supposed to be our primary buffer, and if
1153                          * it got full, then all bets are off
1154                          * anyway. */
1155
1156                         sd_bus_message_unref(bus->wqueue[0]);
1157                         bus->wqueue_size --;
1158                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1159                         bus->windex = 0;
1160
1161                         ret = 1;
1162                 }
1163         }
1164
1165         return ret;
1166 }
1167
1168 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1169         sd_bus_message *z = NULL;
1170         int r, ret = 0;
1171
1172         assert(bus);
1173         assert(m);
1174         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1175
1176         if (bus->rqueue_size > 0) {
1177                 /* Dispatch a queued message */
1178
1179                 *m = bus->rqueue[0];
1180                 bus->rqueue_size --;
1181                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1182                 return 1;
1183         }
1184
1185         /* Try to read a new message */
1186         do {
1187                 if (bus->is_kernel)
1188                         r = bus_kernel_read_message(bus, &z);
1189                 else
1190                         r = bus_socket_read_message(bus, &z);
1191
1192                 if (r < 0) {
1193                         sd_bus_close(bus);
1194                         return r;
1195                 }
1196                 if (r == 0)
1197                         return ret;
1198
1199                 ret = 1;
1200         } while (!z);
1201
1202         *m = z;
1203         return ret;
1204 }
1205
1206 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1207         int r;
1208
1209         assert_return(bus, -EINVAL);
1210         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1211         assert_return(m, -EINVAL);
1212         assert_return(!bus_pid_changed(bus), -ECHILD);
1213
1214         if (m->n_fds > 0) {
1215                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1216                 if (r < 0)
1217                         return r;
1218                 if (r == 0)
1219                         return -ENOTSUP;
1220         }
1221
1222         /* If the serial number isn't kept, then we know that no reply
1223          * is expected */
1224         if (!serial && !m->sealed)
1225                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1226
1227         r = bus_seal_message(bus, m);
1228         if (r < 0)
1229                 return r;
1230
1231         /* If this is a reply and no reply was requested, then let's
1232          * suppress this, if we can */
1233         if (m->dont_send && !serial)
1234                 return 0;
1235
1236         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1237                 size_t idx = 0;
1238
1239                 if (bus->is_kernel)
1240                         r = bus_kernel_write_message(bus, m);
1241                 else
1242                         r = bus_socket_write_message(bus, m, &idx);
1243
1244                 if (r < 0) {
1245                         sd_bus_close(bus);
1246                         return r;
1247                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1248                         /* Wasn't fully written. So let's remember how
1249                          * much was written. Note that the first entry
1250                          * of the wqueue array is always allocated so
1251                          * that we always can remember how much was
1252                          * written. */
1253                         bus->wqueue[0] = sd_bus_message_ref(m);
1254                         bus->wqueue_size = 1;
1255                         bus->windex = idx;
1256                 }
1257         } else {
1258                 sd_bus_message **q;
1259
1260                 /* Just append it to the queue. */
1261
1262                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1263                         return -ENOBUFS;
1264
1265                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1266                 if (!q)
1267                         return -ENOMEM;
1268
1269                 bus->wqueue = q;
1270                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1271         }
1272
1273         if (serial)
1274                 *serial = BUS_MESSAGE_SERIAL(m);
1275
1276         return 0;
1277 }
1278
1279 static usec_t calc_elapse(uint64_t usec) {
1280         if (usec == (uint64_t) -1)
1281                 return 0;
1282
1283         if (usec == 0)
1284                 usec = BUS_DEFAULT_TIMEOUT;
1285
1286         return now(CLOCK_MONOTONIC) + usec;
1287 }
1288
1289 static int timeout_compare(const void *a, const void *b) {
1290         const struct reply_callback *x = a, *y = b;
1291
1292         if (x->timeout != 0 && y->timeout == 0)
1293                 return -1;
1294
1295         if (x->timeout == 0 && y->timeout != 0)
1296                 return 1;
1297
1298         if (x->timeout < y->timeout)
1299                 return -1;
1300
1301         if (x->timeout > y->timeout)
1302                 return 1;
1303
1304         return 0;
1305 }
1306
1307 int sd_bus_send_with_reply(
1308                 sd_bus *bus,
1309                 sd_bus_message *m,
1310                 sd_bus_message_handler_t callback,
1311                 void *userdata,
1312                 uint64_t usec,
1313                 uint64_t *serial) {
1314
1315         struct reply_callback *c;
1316         int r;
1317
1318         assert_return(bus, -EINVAL);
1319         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1320         assert_return(m, -EINVAL);
1321         assert_return(m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL, -EINVAL);
1322         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1323         assert_return(callback, -EINVAL);
1324         assert_return(!bus_pid_changed(bus), -ECHILD);
1325
1326         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1327         if (r < 0)
1328                 return r;
1329
1330         if (usec != (uint64_t) -1) {
1331                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1332                 if (r < 0)
1333                         return r;
1334         }
1335
1336         r = bus_seal_message(bus, m);
1337         if (r < 0)
1338                 return r;
1339
1340         c = new0(struct reply_callback, 1);
1341         if (!c)
1342                 return -ENOMEM;
1343
1344         c->callback = callback;
1345         c->userdata = userdata;
1346         c->serial = BUS_MESSAGE_SERIAL(m);
1347         c->timeout = calc_elapse(usec);
1348
1349         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1350         if (r < 0) {
1351                 free(c);
1352                 return r;
1353         }
1354
1355         if (c->timeout != 0) {
1356                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1357                 if (r < 0) {
1358                         c->timeout = 0;
1359                         sd_bus_send_with_reply_cancel(bus, c->serial);
1360                         return r;
1361                 }
1362         }
1363
1364         r = sd_bus_send(bus, m, serial);
1365         if (r < 0) {
1366                 sd_bus_send_with_reply_cancel(bus, c->serial);
1367                 return r;
1368         }
1369
1370         return r;
1371 }
1372
1373 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1374         struct reply_callback *c;
1375
1376         assert_return(bus, -EINVAL);
1377         assert_return(serial != 0, -EINVAL);
1378         assert_return(!bus_pid_changed(bus), -ECHILD);
1379
1380         c = hashmap_remove(bus->reply_callbacks, &serial);
1381         if (!c)
1382                 return 0;
1383
1384         if (c->timeout != 0)
1385                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1386
1387         free(c);
1388         return 1;
1389 }
1390
1391 int bus_ensure_running(sd_bus *bus) {
1392         int r;
1393
1394         assert(bus);
1395
1396         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1397                 return -ENOTCONN;
1398         if (bus->state == BUS_RUNNING)
1399                 return 1;
1400
1401         for (;;) {
1402                 r = sd_bus_process(bus, NULL);
1403                 if (r < 0)
1404                         return r;
1405                 if (bus->state == BUS_RUNNING)
1406                         return 1;
1407                 if (r > 0)
1408                         continue;
1409
1410                 r = sd_bus_wait(bus, (uint64_t) -1);
1411                 if (r < 0)
1412                         return r;
1413         }
1414 }
1415
1416 int sd_bus_send_with_reply_and_block(
1417                 sd_bus *bus,
1418                 sd_bus_message *m,
1419                 uint64_t usec,
1420                 sd_bus_error *error,
1421                 sd_bus_message **reply) {
1422
1423         int r;
1424         usec_t timeout;
1425         uint64_t serial;
1426         bool room = false;
1427
1428         assert_return(bus, -EINVAL);
1429         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1430         assert_return(m, -EINVAL);
1431         assert_return(m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL, -EINVAL);
1432         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1433         assert_return(!bus_error_is_dirty(error), -EINVAL);
1434         assert_return(!bus_pid_changed(bus), -ECHILD);
1435
1436         r = bus_ensure_running(bus);
1437         if (r < 0)
1438                 return r;
1439
1440         r = sd_bus_send(bus, m, &serial);
1441         if (r < 0)
1442                 return r;
1443
1444         timeout = calc_elapse(usec);
1445
1446         for (;;) {
1447                 usec_t left;
1448                 sd_bus_message *incoming = NULL;
1449
1450                 if (!room) {
1451                         sd_bus_message **q;
1452
1453                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1454                                 return -ENOBUFS;
1455
1456                         /* Make sure there's room for queuing this
1457                          * locally, before we read the message */
1458
1459                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1460                         if (!q)
1461                                 return -ENOMEM;
1462
1463                         bus->rqueue = q;
1464                         room = true;
1465                 }
1466
1467                 if (bus->is_kernel)
1468                         r = bus_kernel_read_message(bus, &incoming);
1469                 else
1470                         r = bus_socket_read_message(bus, &incoming);
1471                 if (r < 0)
1472                         return r;
1473                 if (incoming) {
1474
1475                         if (incoming->reply_serial == serial) {
1476                                 /* Found a match! */
1477
1478                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1479
1480                                         if (reply)
1481                                                 *reply = incoming;
1482                                         else
1483                                                 sd_bus_message_unref(incoming);
1484
1485                                         return 0;
1486                                 }
1487
1488                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1489                                         int k;
1490
1491                                         r = sd_bus_error_copy(error, &incoming->error);
1492                                         if (r < 0) {
1493                                                 sd_bus_message_unref(incoming);
1494                                                 return r;
1495                                         }
1496
1497                                         k = bus_error_to_errno(&incoming->error);
1498                                         sd_bus_message_unref(incoming);
1499                                         return k;
1500                                 }
1501
1502                                 sd_bus_message_unref(incoming);
1503                                 return -EIO;
1504                         }
1505
1506                         /* There's already guaranteed to be room for
1507                          * this, so need to resize things here */
1508                         bus->rqueue[bus->rqueue_size ++] = incoming;
1509                         room = false;
1510
1511                         /* Try to read more, right-away */
1512                         continue;
1513                 }
1514                 if (r != 0)
1515                         continue;
1516
1517                 if (timeout > 0) {
1518                         usec_t n;
1519
1520                         n = now(CLOCK_MONOTONIC);
1521                         if (n >= timeout)
1522                                 return -ETIMEDOUT;
1523
1524                         left = timeout - n;
1525                 } else
1526                         left = (uint64_t) -1;
1527
1528                 r = bus_poll(bus, true, left);
1529                 if (r < 0)
1530                         return r;
1531
1532                 r = dispatch_wqueue(bus);
1533                 if (r < 0)
1534                         return r;
1535         }
1536 }
1537
1538 int sd_bus_get_fd(sd_bus *bus) {
1539
1540         assert_return(bus, -EINVAL);
1541         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1542         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1543         assert_return(!bus_pid_changed(bus), -ECHILD);
1544
1545         return bus->input_fd;
1546 }
1547
1548 int sd_bus_get_events(sd_bus *bus) {
1549         int flags = 0;
1550
1551         assert_return(bus, -EINVAL);
1552         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1553         assert_return(!bus_pid_changed(bus), -ECHILD);
1554
1555         if (bus->state == BUS_OPENING)
1556                 flags |= POLLOUT;
1557         else if (bus->state == BUS_AUTHENTICATING) {
1558
1559                 if (bus_socket_auth_needs_write(bus))
1560                         flags |= POLLOUT;
1561
1562                 flags |= POLLIN;
1563
1564         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1565                 if (bus->rqueue_size <= 0)
1566                         flags |= POLLIN;
1567                 if (bus->wqueue_size > 0)
1568                         flags |= POLLOUT;
1569         }
1570
1571         return flags;
1572 }
1573
1574 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1575         struct reply_callback *c;
1576
1577         assert_return(bus, -EINVAL);
1578         assert_return(timeout_usec, -EINVAL);
1579         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1580         assert_return(!bus_pid_changed(bus), -ECHILD);
1581
1582         if (bus->state == BUS_AUTHENTICATING) {
1583                 *timeout_usec = bus->auth_timeout;
1584                 return 1;
1585         }
1586
1587         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1588                 *timeout_usec = (uint64_t) -1;
1589                 return 0;
1590         }
1591
1592         if (bus->rqueue_size > 0) {
1593                 *timeout_usec = 0;
1594                 return 1;
1595         }
1596
1597         c = prioq_peek(bus->reply_callbacks_prioq);
1598         if (!c) {
1599                 *timeout_usec = (uint64_t) -1;
1600                 return 0;
1601         }
1602
1603         *timeout_usec = c->timeout;
1604         return 1;
1605 }
1606
1607 static int process_timeout(sd_bus *bus) {
1608         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1609         struct reply_callback *c;
1610         usec_t n;
1611         int r;
1612
1613         assert(bus);
1614
1615         c = prioq_peek(bus->reply_callbacks_prioq);
1616         if (!c)
1617                 return 0;
1618
1619         n = now(CLOCK_MONOTONIC);
1620         if (c->timeout > n)
1621                 return 0;
1622
1623         r = bus_message_new_synthetic_error(
1624                         bus,
1625                         c->serial,
1626                         &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1627                         &m);
1628         if (r < 0)
1629                 return r;
1630
1631         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1632         hashmap_remove(bus->reply_callbacks, &c->serial);
1633
1634         r = c->callback(bus, m, c->userdata);
1635         free(c);
1636
1637         return r < 0 ? r : 1;
1638 }
1639
1640 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1641         assert(bus);
1642         assert(m);
1643
1644         if (bus->state != BUS_HELLO)
1645                 return 0;
1646
1647         /* Let's make sure the first message on the bus is the HELLO
1648          * reply. But note that we don't actually parse the message
1649          * here (we leave that to the usual handling), we just verify
1650          * we don't let any earlier msg through. */
1651
1652         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1653             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1654                 return -EIO;
1655
1656         if (m->reply_serial != bus->hello_serial)
1657                 return -EIO;
1658
1659         return 0;
1660 }
1661
1662 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1663         struct reply_callback *c;
1664         int r;
1665
1666         assert(bus);
1667         assert(m);
1668
1669         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1670             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1671                 return 0;
1672
1673         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1674         if (!c)
1675                 return 0;
1676
1677         if (c->timeout != 0)
1678                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1679
1680         r = sd_bus_message_rewind(m, true);
1681         if (r < 0)
1682                 return r;
1683
1684         r = c->callback(bus, m, c->userdata);
1685         free(c);
1686
1687         return r;
1688 }
1689
1690 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1691         struct filter_callback *l;
1692         int r;
1693
1694         assert(bus);
1695         assert(m);
1696
1697         do {
1698                 bus->filter_callbacks_modified = false;
1699
1700                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1701
1702                         if (bus->filter_callbacks_modified)
1703                                 break;
1704
1705                         /* Don't run this more than once per iteration */
1706                         if (l->last_iteration == bus->iteration_counter)
1707                                 continue;
1708
1709                         l->last_iteration = bus->iteration_counter;
1710
1711                         r = sd_bus_message_rewind(m, true);
1712                         if (r < 0)
1713                                 return r;
1714
1715                         r = l->callback(bus, m, l->userdata);
1716                         if (r != 0)
1717                                 return r;
1718
1719                 }
1720
1721         } while (bus->filter_callbacks_modified);
1722
1723         return 0;
1724 }
1725
1726 static int process_match(sd_bus *bus, sd_bus_message *m) {
1727         int r;
1728
1729         assert(bus);
1730         assert(m);
1731
1732         do {
1733                 bus->match_callbacks_modified = false;
1734
1735                 r = bus_match_run(bus, &bus->match_callbacks, m);
1736                 if (r != 0)
1737                         return r;
1738
1739         } while (bus->match_callbacks_modified);
1740
1741         return 0;
1742 }
1743
1744 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1745         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1746         int r;
1747
1748         assert(bus);
1749         assert(m);
1750
1751         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1752                 return 0;
1753
1754         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1755                 return 0;
1756
1757         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1758                 return 1;
1759
1760         if (streq_ptr(m->member, "Ping"))
1761                 r = sd_bus_message_new_method_return(bus, m, &reply);
1762         else if (streq_ptr(m->member, "GetMachineId")) {
1763                 sd_id128_t id;
1764                 char sid[33];
1765
1766                 r = sd_id128_get_machine(&id);
1767                 if (r < 0)
1768                         return r;
1769
1770                 r = sd_bus_message_new_method_return(bus, m, &reply);
1771                 if (r < 0)
1772                         return r;
1773
1774                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1775         } else {
1776                 r = sd_bus_message_new_method_errorf(
1777                                 bus, m, &reply,
1778                                 "org.freedesktop.DBus.Error.UnknownMethod",
1779                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1780         }
1781
1782         if (r < 0)
1783                 return r;
1784
1785         r = sd_bus_send(bus, reply, NULL);
1786         if (r < 0)
1787                 return r;
1788
1789         return 1;
1790 }
1791
1792 static int process_message(sd_bus *bus, sd_bus_message *m) {
1793         int r;
1794
1795         assert(bus);
1796         assert(m);
1797
1798         bus->iteration_counter++;
1799
1800         r = process_hello(bus, m);
1801         if (r != 0)
1802                 return r;
1803
1804         r = process_reply(bus, m);
1805         if (r != 0)
1806                 return r;
1807
1808         r = process_filter(bus, m);
1809         if (r != 0)
1810                 return r;
1811
1812         r = process_match(bus, m);
1813         if (r != 0)
1814                 return r;
1815
1816         r = process_builtin(bus, m);
1817         if (r != 0)
1818                 return r;
1819
1820         return bus_process_object(bus, m);
1821 }
1822
1823 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1824         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1825         int r;
1826
1827         assert(bus);
1828         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1829
1830         r = process_timeout(bus);
1831         if (r != 0)
1832                 goto null_message;
1833
1834         r = dispatch_wqueue(bus);
1835         if (r != 0)
1836                 goto null_message;
1837
1838         r = dispatch_rqueue(bus, &m);
1839         if (r < 0)
1840                 return r;
1841         if (!m)
1842                 goto null_message;
1843
1844         r = process_message(bus, m);
1845         if (r != 0)
1846                 goto null_message;
1847
1848         if (ret) {
1849                 r = sd_bus_message_rewind(m, true);
1850                 if (r < 0)
1851                         return r;
1852
1853                 *ret = m;
1854                 m = NULL;
1855                 return 1;
1856         }
1857
1858         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1859
1860                 r = sd_bus_reply_method_errorf(
1861                                 bus, m,
1862                                 "org.freedesktop.DBus.Error.UnknownObject",
1863                                 "Unknown object '%s'.", m->path);
1864                 if (r < 0)
1865                         return r;
1866         }
1867
1868         return 1;
1869
1870 null_message:
1871         if (r >= 0 && ret)
1872                 *ret = NULL;
1873
1874         return r;
1875 }
1876
1877 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1878         BUS_DONT_DESTROY(bus);
1879         int r;
1880
1881         /* Returns 0 when we didn't do anything. This should cause the
1882          * caller to invoke sd_bus_wait() before returning the next
1883          * time. Returns > 0 when we did something, which possibly
1884          * means *ret is filled in with an unprocessed message. */
1885
1886         assert_return(bus, -EINVAL);
1887         assert_return(!bus_pid_changed(bus), -ECHILD);
1888
1889         /* We don't allow recursively invoking sd_bus_process(). */
1890         assert_return(!bus->processing, -EBUSY);
1891
1892         switch (bus->state) {
1893
1894         case BUS_UNSET:
1895         case BUS_CLOSED:
1896                 return -ENOTCONN;
1897
1898         case BUS_OPENING:
1899                 r = bus_socket_process_opening(bus);
1900                 if (r < 0)
1901                         return r;
1902                 if (ret)
1903                         *ret = NULL;
1904                 return r;
1905
1906         case BUS_AUTHENTICATING:
1907
1908                 r = bus_socket_process_authenticating(bus);
1909                 if (r < 0)
1910                         return r;
1911                 if (ret)
1912                         *ret = NULL;
1913                 return r;
1914
1915         case BUS_RUNNING:
1916         case BUS_HELLO:
1917
1918                 bus->processing = true;
1919                 r = process_running(bus, ret);
1920                 bus->processing = false;
1921
1922                 return r;
1923         }
1924
1925         assert_not_reached("Unknown state");
1926 }
1927
1928 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1929         struct pollfd p[2] = {};
1930         int r, e, n;
1931         struct timespec ts;
1932         usec_t m = (usec_t) -1;
1933
1934         assert(bus);
1935         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1936
1937         e = sd_bus_get_events(bus);
1938         if (e < 0)
1939                 return e;
1940
1941         if (need_more)
1942                 /* The caller really needs some more data, he doesn't
1943                  * care about what's already read, or any timeouts
1944                  * except its own.*/
1945                 e |= POLLIN;
1946         else {
1947                 usec_t until;
1948                 /* The caller wants to process if there's something to
1949                  * process, but doesn't care otherwise */
1950
1951                 r = sd_bus_get_timeout(bus, &until);
1952                 if (r < 0)
1953                         return r;
1954                 if (r > 0) {
1955                         usec_t nw;
1956                         nw = now(CLOCK_MONOTONIC);
1957                         m = until > nw ? until - nw : 0;
1958                 }
1959         }
1960
1961         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1962                 m = timeout_usec;
1963
1964         p[0].fd = bus->input_fd;
1965         if (bus->output_fd == bus->input_fd) {
1966                 p[0].events = e;
1967                 n = 1;
1968         } else {
1969                 p[0].events = e & POLLIN;
1970                 p[1].fd = bus->output_fd;
1971                 p[1].events = e & POLLOUT;
1972                 n = 2;
1973         }
1974
1975         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1976         if (r < 0)
1977                 return -errno;
1978
1979         return r > 0 ? 1 : 0;
1980 }
1981
1982 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1983
1984         assert_return(bus, -EINVAL);
1985         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1986         assert_return(!bus_pid_changed(bus), -ECHILD);
1987
1988         if (bus->rqueue_size > 0)
1989                 return 0;
1990
1991         return bus_poll(bus, false, timeout_usec);
1992 }
1993
1994 int sd_bus_flush(sd_bus *bus) {
1995         int r;
1996
1997         assert_return(bus, -EINVAL);
1998         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1999         assert_return(!bus_pid_changed(bus), -ECHILD);
2000
2001         r = bus_ensure_running(bus);
2002         if (r < 0)
2003                 return r;
2004
2005         if (bus->wqueue_size <= 0)
2006                 return 0;
2007
2008         for (;;) {
2009                 r = dispatch_wqueue(bus);
2010                 if (r < 0)
2011                         return r;
2012
2013                 if (bus->wqueue_size <= 0)
2014                         return 0;
2015
2016                 r = bus_poll(bus, false, (uint64_t) -1);
2017                 if (r < 0)
2018                         return r;
2019         }
2020 }
2021
2022 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2023         struct filter_callback *f;
2024
2025         assert_return(bus, -EINVAL);
2026         assert_return(callback, -EINVAL);
2027         assert_return(!bus_pid_changed(bus), -ECHILD);
2028
2029         f = new0(struct filter_callback, 1);
2030         if (!f)
2031                 return -ENOMEM;
2032         f->callback = callback;
2033         f->userdata = userdata;
2034
2035         bus->filter_callbacks_modified = true;
2036         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2037         return 0;
2038 }
2039
2040 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2041         struct filter_callback *f;
2042
2043         assert_return(bus, -EINVAL);
2044         assert_return(callback, -EINVAL);
2045         assert_return(!bus_pid_changed(bus), -ECHILD);
2046
2047         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2048                 if (f->callback == callback && f->userdata == userdata) {
2049                         bus->filter_callbacks_modified = true;
2050                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2051                         free(f);
2052                         return 1;
2053                 }
2054         }
2055
2056         return 0;
2057 }
2058
2059 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2060         struct bus_match_component *components = NULL;
2061         unsigned n_components = 0;
2062         uint64_t cookie = 0;
2063         int r = 0;
2064
2065         assert_return(bus, -EINVAL);
2066         assert_return(match, -EINVAL);
2067         assert_return(!bus_pid_changed(bus), -ECHILD);
2068
2069         r = bus_match_parse(match, &components, &n_components);
2070         if (r < 0)
2071                 goto finish;
2072
2073         if (bus->bus_client) {
2074                 cookie = ++bus->match_cookie;
2075
2076                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2077                 if (r < 0)
2078                         goto finish;
2079         }
2080
2081         bus->match_callbacks_modified = true;
2082         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2083         if (r < 0) {
2084                 if (bus->bus_client)
2085                         bus_remove_match_internal(bus, match, cookie);
2086         }
2087
2088 finish:
2089         bus_match_parse_free(components, n_components);
2090         return r;
2091 }
2092
2093 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2094         struct bus_match_component *components = NULL;
2095         unsigned n_components = 0;
2096         int r = 0, q = 0;
2097         uint64_t cookie = 0;
2098
2099         assert_return(bus, -EINVAL);
2100         assert_return(match, -EINVAL);
2101         assert_return(!bus_pid_changed(bus), -ECHILD);
2102
2103         r = bus_match_parse(match, &components, &n_components);
2104         if (r < 0)
2105                 return r;
2106
2107         bus->match_callbacks_modified = true;
2108         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2109
2110         if (bus->bus_client)
2111                 q = bus_remove_match_internal(bus, match, cookie);
2112
2113         bus_match_parse_free(components, n_components);
2114
2115         return r < 0 ? r : q;
2116 }
2117
2118 bool bus_pid_changed(sd_bus *bus) {
2119         assert(bus);
2120
2121         /* We don't support people creating a bus connection and
2122          * keeping it around over a fork(). Let's complain. */
2123
2124         return bus->original_pid != getpid();
2125 }