chiark / gitweb /
ca687c0a65413a0ac1739b0d37e9d638a0e66ce8
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48
49 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
50
51 static void bus_close_fds(sd_bus *b) {
52         assert(b);
53
54         if (b->input_fd >= 0)
55                 close_nointr_nofail(b->input_fd);
56
57         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
58                 close_nointr_nofail(b->output_fd);
59
60         b->input_fd = b->output_fd = -1;
61 }
62
63 static void bus_node_destroy(sd_bus *b, struct node *n) {
64         struct node_callback *c;
65         struct node_vtable *v;
66         struct node_enumerator *e;
67
68         assert(b);
69
70         if (!n)
71                 return;
72
73         while (n->child)
74                 bus_node_destroy(b, n->child);
75
76         while ((c = n->callbacks)) {
77                 LIST_REMOVE(struct node_callback, callbacks, n->callbacks, c);
78                 free(c);
79         }
80
81         while ((v = n->vtables)) {
82                 LIST_REMOVE(struct node_vtable, vtables, n->vtables, v);
83                 free(v->interface);
84                 free(v);
85         }
86
87         while ((e = n->enumerators)) {
88                 LIST_REMOVE(struct node_enumerator, enumerators, n->enumerators, e);
89                 free(e);
90         }
91
92         if (n->parent)
93                 LIST_REMOVE(struct node, siblings, n->parent->child, n);
94
95         assert_se(hashmap_remove(b->nodes, n->path) == n);
96         free(n->path);
97         free(n);
98 }
99
100 static void bus_free(sd_bus *b) {
101         struct filter_callback *f;
102         struct node *n;
103         unsigned i;
104
105         assert(b);
106
107         bus_close_fds(b);
108
109         if (b->kdbus_buffer)
110                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
111
112         free(b->rbuffer);
113         free(b->unique_name);
114         free(b->auth_buffer);
115         free(b->address);
116         free(b->kernel);
117
118         free(b->exec_path);
119         strv_free(b->exec_argv);
120
121         close_many(b->fds, b->n_fds);
122         free(b->fds);
123
124         for (i = 0; i < b->rqueue_size; i++)
125                 sd_bus_message_unref(b->rqueue[i]);
126         free(b->rqueue);
127
128         for (i = 0; i < b->wqueue_size; i++)
129                 sd_bus_message_unref(b->wqueue[i]);
130         free(b->wqueue);
131
132         hashmap_free_free(b->reply_callbacks);
133         prioq_free(b->reply_callbacks_prioq);
134
135         while ((f = b->filter_callbacks)) {
136                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
137                 free(f);
138         }
139
140         bus_match_free(&b->match_callbacks);
141
142         hashmap_free_free(b->vtable_methods);
143         hashmap_free_free(b->vtable_properties);
144
145         while ((n = hashmap_first(b->nodes)))
146                 bus_node_destroy(b, n);
147
148         hashmap_free(b->nodes);
149
150         bus_kernel_flush_memfd(b);
151
152         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
153
154         free(b);
155 }
156
157 int sd_bus_new(sd_bus **ret) {
158         sd_bus *r;
159
160         if (!ret)
161                 return -EINVAL;
162
163         r = new0(sd_bus, 1);
164         if (!r)
165                 return -ENOMEM;
166
167         r->n_ref = REFCNT_INIT;
168         r->input_fd = r->output_fd = -1;
169         r->message_version = 1;
170         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
171         r->original_pid = getpid();
172
173         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
174
175         /* We guarantee that wqueue always has space for at least one
176          * entry */
177         r->wqueue = new(sd_bus_message*, 1);
178         if (!r->wqueue) {
179                 free(r);
180                 return -ENOMEM;
181         }
182
183         *ret = r;
184         return 0;
185 }
186
187 int sd_bus_set_address(sd_bus *bus, const char *address) {
188         char *a;
189
190         if (!bus)
191                 return -EINVAL;
192         if (bus->state != BUS_UNSET)
193                 return -EPERM;
194         if (!address)
195                 return -EINVAL;
196         if (bus_pid_changed(bus))
197                 return -ECHILD;
198
199         a = strdup(address);
200         if (!a)
201                 return -ENOMEM;
202
203         free(bus->address);
204         bus->address = a;
205
206         return 0;
207 }
208
209 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
210         if (!bus)
211                 return -EINVAL;
212         if (bus->state != BUS_UNSET)
213                 return -EPERM;
214         if (input_fd < 0)
215                 return -EINVAL;
216         if (output_fd < 0)
217                 return -EINVAL;
218         if (bus_pid_changed(bus))
219                 return -ECHILD;
220
221         bus->input_fd = input_fd;
222         bus->output_fd = output_fd;
223         return 0;
224 }
225
226 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
227         char *p, **a;
228
229         if (!bus)
230                 return -EINVAL;
231         if (bus->state != BUS_UNSET)
232                 return -EPERM;
233         if (!path)
234                 return -EINVAL;
235         if (strv_isempty(argv))
236                 return -EINVAL;
237         if (bus_pid_changed(bus))
238                 return -ECHILD;
239
240         p = strdup(path);
241         if (!p)
242                 return -ENOMEM;
243
244         a = strv_copy(argv);
245         if (!a) {
246                 free(p);
247                 return -ENOMEM;
248         }
249
250         free(bus->exec_path);
251         strv_free(bus->exec_argv);
252
253         bus->exec_path = p;
254         bus->exec_argv = a;
255
256         return 0;
257 }
258
259 int sd_bus_set_bus_client(sd_bus *bus, int b) {
260         if (!bus)
261                 return -EINVAL;
262         if (bus->state != BUS_UNSET)
263                 return -EPERM;
264         if (bus_pid_changed(bus))
265                 return -ECHILD;
266
267         bus->bus_client = !!b;
268         return 0;
269 }
270
271 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
272         if (!bus)
273                 return -EINVAL;
274         if (bus->state != BUS_UNSET)
275                 return -EPERM;
276         if (bus_pid_changed(bus))
277                 return -ECHILD;
278
279         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
280         return 0;
281 }
282
283 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
284         if (!bus)
285                 return -EINVAL;
286         if (bus->state != BUS_UNSET)
287                 return -EPERM;
288         if (bus_pid_changed(bus))
289                 return -ECHILD;
290
291         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
292         return 0;
293 }
294
295 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
296         if (!bus)
297                 return -EINVAL;
298         if (bus->state != BUS_UNSET)
299                 return -EPERM;
300         if (bus_pid_changed(bus))
301                 return -ECHILD;
302
303         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
304         return 0;
305 }
306
307 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
308         if (!bus)
309                 return -EINVAL;
310         if (bus->state != BUS_UNSET)
311                 return -EPERM;
312         if (bus_pid_changed(bus))
313                 return -ECHILD;
314
315         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
316         return 0;
317 }
318
319 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
320         if (!bus)
321                 return -EINVAL;
322         if (bus->state != BUS_UNSET)
323                 return -EPERM;
324         if (bus_pid_changed(bus))
325                 return -ECHILD;
326
327         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
328         return 0;
329 }
330
331 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
332         if (!bus)
333                 return -EINVAL;
334         if (bus->state != BUS_UNSET)
335                 return -EPERM;
336         if (bus_pid_changed(bus))
337                 return -ECHILD;
338
339         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
340         return 0;
341 }
342
343 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
344         if (!bus)
345                 return -EINVAL;
346         if (bus->state != BUS_UNSET)
347                 return -EPERM;
348         if (bus_pid_changed(bus))
349                 return -ECHILD;
350
351         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
352         return 0;
353 }
354
355 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
356         if (!bus)
357                 return -EINVAL;
358         if (bus->state != BUS_UNSET)
359                 return -EPERM;
360         if (bus_pid_changed(bus))
361                 return -ECHILD;
362
363         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
364         return 0;
365 }
366
367 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
368         if (!bus)
369                 return -EINVAL;
370         if (!b && !sd_id128_equal(server_id, SD_ID128_NULL))
371                 return -EINVAL;
372         if (bus->state != BUS_UNSET)
373                 return -EPERM;
374         if (bus_pid_changed(bus))
375                 return -ECHILD;
376
377         bus->is_server = !!b;
378         bus->server_id = server_id;
379         return 0;
380 }
381
382 int sd_bus_set_anonymous(sd_bus *bus, int b) {
383         if (!bus)
384                 return -EINVAL;
385         if (bus->state != BUS_UNSET)
386                 return -EPERM;
387         if (bus_pid_changed(bus))
388                 return -ECHILD;
389
390         bus->anonymous_auth = !!b;
391         return 0;
392 }
393
394 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
395         const char *s;
396         int r;
397
398         assert(bus);
399         assert(bus->state == BUS_HELLO);
400         assert(reply);
401
402         r = bus_message_to_errno(reply);
403         if (r < 0)
404                 return r;
405
406         r = sd_bus_message_read(reply, "s", &s);
407         if (r < 0)
408                 return r;
409
410         if (!service_name_is_valid(s) || s[0] != ':')
411                 return -EBADMSG;
412
413         bus->unique_name = strdup(s);
414         if (!bus->unique_name)
415                 return -ENOMEM;
416
417         bus->state = BUS_RUNNING;
418
419         return 1;
420 }
421
422 static int bus_send_hello(sd_bus *bus) {
423         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
424         int r;
425
426         assert(bus);
427
428         if (!bus->bus_client || bus->is_kernel)
429                 return 0;
430
431         r = sd_bus_message_new_method_call(
432                         bus,
433                         "org.freedesktop.DBus",
434                         "/",
435                         "org.freedesktop.DBus",
436                         "Hello",
437                         &m);
438         if (r < 0)
439                 return r;
440
441         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
442 }
443
444 int bus_start_running(sd_bus *bus) {
445         assert(bus);
446
447         if (bus->bus_client && !bus->is_kernel) {
448                 bus->state = BUS_HELLO;
449                 return 1;
450         }
451
452         bus->state = BUS_RUNNING;
453         return 1;
454 }
455
456 static int parse_address_key(const char **p, const char *key, char **value) {
457         size_t l, n = 0;
458         const char *a;
459         char *r = NULL;
460
461         assert(p);
462         assert(*p);
463         assert(value);
464
465         if (key) {
466                 l = strlen(key);
467                 if (strncmp(*p, key, l) != 0)
468                         return 0;
469
470                 if ((*p)[l] != '=')
471                         return 0;
472
473                 if (*value)
474                         return -EINVAL;
475
476                 a = *p + l + 1;
477         } else
478                 a = *p;
479
480         while (*a != ';' && *a != ',' && *a != 0) {
481                 char c, *t;
482
483                 if (*a == '%') {
484                         int x, y;
485
486                         x = unhexchar(a[1]);
487                         if (x < 0) {
488                                 free(r);
489                                 return x;
490                         }
491
492                         y = unhexchar(a[2]);
493                         if (y < 0) {
494                                 free(r);
495                                 return y;
496                         }
497
498                         c = (char) ((x << 4) | y);
499                         a += 3;
500                 } else {
501                         c = *a;
502                         a++;
503                 }
504
505                 t = realloc(r, n + 2);
506                 if (!t) {
507                         free(r);
508                         return -ENOMEM;
509                 }
510
511                 r = t;
512                 r[n++] = c;
513         }
514
515         if (!r) {
516                 r = strdup("");
517                 if (!r)
518                         return -ENOMEM;
519         } else
520                 r[n] = 0;
521
522         if (*a == ',')
523                 a++;
524
525         *p = a;
526
527         free(*value);
528         *value = r;
529
530         return 1;
531 }
532
533 static void skip_address_key(const char **p) {
534         assert(p);
535         assert(*p);
536
537         *p += strcspn(*p, ",");
538
539         if (**p == ',')
540                 (*p) ++;
541 }
542
543 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
544         _cleanup_free_ char *path = NULL, *abstract = NULL;
545         size_t l;
546         int r;
547
548         assert(b);
549         assert(p);
550         assert(*p);
551         assert(guid);
552
553         while (**p != 0 && **p != ';') {
554                 r = parse_address_key(p, "guid", guid);
555                 if (r < 0)
556                         return r;
557                 else if (r > 0)
558                         continue;
559
560                 r = parse_address_key(p, "path", &path);
561                 if (r < 0)
562                         return r;
563                 else if (r > 0)
564                         continue;
565
566                 r = parse_address_key(p, "abstract", &abstract);
567                 if (r < 0)
568                         return r;
569                 else if (r > 0)
570                         continue;
571
572                 skip_address_key(p);
573         }
574
575         if (!path && !abstract)
576                 return -EINVAL;
577
578         if (path && abstract)
579                 return -EINVAL;
580
581         if (path) {
582                 l = strlen(path);
583                 if (l > sizeof(b->sockaddr.un.sun_path))
584                         return -E2BIG;
585
586                 b->sockaddr.un.sun_family = AF_UNIX;
587                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
588                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
589         } else if (abstract) {
590                 l = strlen(abstract);
591                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
592                         return -E2BIG;
593
594                 b->sockaddr.un.sun_family = AF_UNIX;
595                 b->sockaddr.un.sun_path[0] = 0;
596                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
597                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
598         }
599
600         return 0;
601 }
602
603 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
604         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
605         int r;
606         struct addrinfo *result, hints = {
607                 .ai_socktype = SOCK_STREAM,
608                 .ai_flags = AI_ADDRCONFIG,
609         };
610
611         assert(b);
612         assert(p);
613         assert(*p);
614         assert(guid);
615
616         while (**p != 0 && **p != ';') {
617                 r = parse_address_key(p, "guid", guid);
618                 if (r < 0)
619                         return r;
620                 else if (r > 0)
621                         continue;
622
623                 r = parse_address_key(p, "host", &host);
624                 if (r < 0)
625                         return r;
626                 else if (r > 0)
627                         continue;
628
629                 r = parse_address_key(p, "port", &port);
630                 if (r < 0)
631                         return r;
632                 else if (r > 0)
633                         continue;
634
635                 r = parse_address_key(p, "family", &family);
636                 if (r < 0)
637                         return r;
638                 else if (r > 0)
639                         continue;
640
641                 skip_address_key(p);
642         }
643
644         if (!host || !port)
645                 return -EINVAL;
646
647         if (family) {
648                 if (streq(family, "ipv4"))
649                         hints.ai_family = AF_INET;
650                 else if (streq(family, "ipv6"))
651                         hints.ai_family = AF_INET6;
652                 else
653                         return -EINVAL;
654         }
655
656         r = getaddrinfo(host, port, &hints, &result);
657         if (r == EAI_SYSTEM)
658                 return -errno;
659         else if (r != 0)
660                 return -EADDRNOTAVAIL;
661
662         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
663         b->sockaddr_size = result->ai_addrlen;
664
665         freeaddrinfo(result);
666
667         return 0;
668 }
669
670 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
671         char *path = NULL;
672         unsigned n_argv = 0, j;
673         char **argv = NULL;
674         int r;
675
676         assert(b);
677         assert(p);
678         assert(*p);
679         assert(guid);
680
681         while (**p != 0 && **p != ';') {
682                 r = parse_address_key(p, "guid", guid);
683                 if (r < 0)
684                         goto fail;
685                 else if (r > 0)
686                         continue;
687
688                 r = parse_address_key(p, "path", &path);
689                 if (r < 0)
690                         goto fail;
691                 else if (r > 0)
692                         continue;
693
694                 if (startswith(*p, "argv")) {
695                         unsigned ul;
696
697                         errno = 0;
698                         ul = strtoul(*p + 4, (char**) p, 10);
699                         if (errno > 0 || **p != '=' || ul > 256) {
700                                 r = -EINVAL;
701                                 goto fail;
702                         }
703
704                         (*p) ++;
705
706                         if (ul >= n_argv) {
707                                 char **x;
708
709                                 x = realloc(argv, sizeof(char*) * (ul + 2));
710                                 if (!x) {
711                                         r = -ENOMEM;
712                                         goto fail;
713                                 }
714
715                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
716
717                                 argv = x;
718                                 n_argv = ul + 1;
719                         }
720
721                         r = parse_address_key(p, NULL, argv + ul);
722                         if (r < 0)
723                                 goto fail;
724
725                         continue;
726                 }
727
728                 skip_address_key(p);
729         }
730
731         if (!path) {
732                 r = -EINVAL;
733                 goto fail;
734         }
735
736         /* Make sure there are no holes in the array, with the
737          * exception of argv[0] */
738         for (j = 1; j < n_argv; j++)
739                 if (!argv[j]) {
740                         r = -EINVAL;
741                         goto fail;
742                 }
743
744         if (argv && argv[0] == NULL) {
745                 argv[0] = strdup(path);
746                 if (!argv[0]) {
747                         r = -ENOMEM;
748                         goto fail;
749                 }
750         }
751
752         b->exec_path = path;
753         b->exec_argv = argv;
754         return 0;
755
756 fail:
757         for (j = 0; j < n_argv; j++)
758                 free(argv[j]);
759
760         free(argv);
761         free(path);
762         return r;
763 }
764
765 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
766         _cleanup_free_ char *path = NULL;
767         int r;
768
769         assert(b);
770         assert(p);
771         assert(*p);
772         assert(guid);
773
774         while (**p != 0 && **p != ';') {
775                 r = parse_address_key(p, "guid", guid);
776                 if (r < 0)
777                         return r;
778                 else if (r > 0)
779                         continue;
780
781                 r = parse_address_key(p, "path", &path);
782                 if (r < 0)
783                         return r;
784                 else if (r > 0)
785                         continue;
786
787                 skip_address_key(p);
788         }
789
790         if (!path)
791                 return -EINVAL;
792
793         free(b->kernel);
794         b->kernel = path;
795         path = NULL;
796
797         return 0;
798 }
799
800 static void bus_reset_parsed_address(sd_bus *b) {
801         assert(b);
802
803         zero(b->sockaddr);
804         b->sockaddr_size = 0;
805         strv_free(b->exec_argv);
806         free(b->exec_path);
807         b->exec_path = NULL;
808         b->exec_argv = NULL;
809         b->server_id = SD_ID128_NULL;
810         free(b->kernel);
811         b->kernel = NULL;
812 }
813
814 static int bus_parse_next_address(sd_bus *b) {
815         _cleanup_free_ char *guid = NULL;
816         const char *a;
817         int r;
818
819         assert(b);
820
821         if (!b->address)
822                 return 0;
823         if (b->address[b->address_index] == 0)
824                 return 0;
825
826         bus_reset_parsed_address(b);
827
828         a = b->address + b->address_index;
829
830         while (*a != 0) {
831
832                 if (*a == ';') {
833                         a++;
834                         continue;
835                 }
836
837                 if (startswith(a, "unix:")) {
838                         a += 5;
839
840                         r = parse_unix_address(b, &a, &guid);
841                         if (r < 0)
842                                 return r;
843                         break;
844
845                 } else if (startswith(a, "tcp:")) {
846
847                         a += 4;
848                         r = parse_tcp_address(b, &a, &guid);
849                         if (r < 0)
850                                 return r;
851
852                         break;
853
854                 } else if (startswith(a, "unixexec:")) {
855
856                         a += 9;
857                         r = parse_exec_address(b, &a, &guid);
858                         if (r < 0)
859                                 return r;
860
861                         break;
862
863                 } else if (startswith(a, "kernel:")) {
864
865                         a += 7;
866                         r = parse_kernel_address(b, &a, &guid);
867                         if (r < 0)
868                                 return r;
869
870                         break;
871                 }
872
873                 a = strchr(a, ';');
874                 if (!a)
875                         return 0;
876         }
877
878         if (guid) {
879                 r = sd_id128_from_string(guid, &b->server_id);
880                 if (r < 0)
881                         return r;
882         }
883
884         b->address_index = a - b->address;
885         return 1;
886 }
887
888 static int bus_start_address(sd_bus *b) {
889         int r;
890
891         assert(b);
892
893         for (;;) {
894                 sd_bus_close(b);
895
896                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
897
898                         r = bus_socket_connect(b);
899                         if (r >= 0)
900                                 return r;
901
902                         b->last_connect_error = -r;
903
904                 } else if (b->exec_path) {
905
906                         r = bus_socket_exec(b);
907                         if (r >= 0)
908                                 return r;
909
910                         b->last_connect_error = -r;
911                 } else if (b->kernel) {
912
913                         r = bus_kernel_connect(b);
914                         if (r >= 0)
915                                 return r;
916
917                         b->last_connect_error = -r;
918                 }
919
920                 r = bus_parse_next_address(b);
921                 if (r < 0)
922                         return r;
923                 if (r == 0)
924                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
925         }
926 }
927
928 int bus_next_address(sd_bus *b) {
929         assert(b);
930
931         bus_reset_parsed_address(b);
932         return bus_start_address(b);
933 }
934
935 static int bus_start_fd(sd_bus *b) {
936         struct stat st;
937         int r;
938
939         assert(b);
940         assert(b->input_fd >= 0);
941         assert(b->output_fd >= 0);
942
943         r = fd_nonblock(b->input_fd, true);
944         if (r < 0)
945                 return r;
946
947         r = fd_cloexec(b->input_fd, true);
948         if (r < 0)
949                 return r;
950
951         if (b->input_fd != b->output_fd) {
952                 r = fd_nonblock(b->output_fd, true);
953                 if (r < 0)
954                         return r;
955
956                 r = fd_cloexec(b->output_fd, true);
957                 if (r < 0)
958                         return r;
959         }
960
961         if (fstat(b->input_fd, &st) < 0)
962                 return -errno;
963
964         if (S_ISCHR(b->input_fd))
965                 return bus_kernel_take_fd(b);
966         else
967                 return bus_socket_take_fd(b);
968 }
969
970 int sd_bus_start(sd_bus *bus) {
971         int r;
972
973         if (!bus)
974                 return -EINVAL;
975         if (bus->state != BUS_UNSET)
976                 return -EPERM;
977         if (bus_pid_changed(bus))
978                 return -ECHILD;
979
980         bus->state = BUS_OPENING;
981
982         if (bus->is_server && bus->bus_client)
983                 return -EINVAL;
984
985         if (bus->input_fd >= 0)
986                 r = bus_start_fd(bus);
987         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
988                 r = bus_start_address(bus);
989         else
990                 return -EINVAL;
991
992         if (r < 0)
993                 return r;
994
995         return bus_send_hello(bus);
996 }
997
998 int sd_bus_open_system(sd_bus **ret) {
999         const char *e;
1000         sd_bus *b;
1001         int r;
1002
1003         if (!ret)
1004                 return -EINVAL;
1005
1006         r = sd_bus_new(&b);
1007         if (r < 0)
1008                 return r;
1009
1010         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
1011         if (e) {
1012                 r = sd_bus_set_address(b, e);
1013                 if (r < 0)
1014                         goto fail;
1015         } else {
1016                 b->sockaddr.un.sun_family = AF_UNIX;
1017                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
1018                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
1019         }
1020
1021         b->bus_client = true;
1022
1023         r = sd_bus_start(b);
1024         if (r < 0)
1025                 goto fail;
1026
1027         *ret = b;
1028         return 0;
1029
1030 fail:
1031         bus_free(b);
1032         return r;
1033 }
1034
1035 int sd_bus_open_user(sd_bus **ret) {
1036         const char *e;
1037         sd_bus *b;
1038         size_t l;
1039         int r;
1040
1041         if (!ret)
1042                 return -EINVAL;
1043
1044         r = sd_bus_new(&b);
1045         if (r < 0)
1046                 return r;
1047
1048         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1049         if (e) {
1050                 r = sd_bus_set_address(b, e);
1051                 if (r < 0)
1052                         goto fail;
1053         } else {
1054                 e = secure_getenv("XDG_RUNTIME_DIR");
1055                 if (!e) {
1056                         r = -ENOENT;
1057                         goto fail;
1058                 }
1059
1060                 l = strlen(e);
1061                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1062                         r = -E2BIG;
1063                         goto fail;
1064                 }
1065
1066                 b->sockaddr.un.sun_family = AF_UNIX;
1067                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1068                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1069         }
1070
1071         b->bus_client = true;
1072
1073         r = sd_bus_start(b);
1074         if (r < 0)
1075                 goto fail;
1076
1077         *ret = b;
1078         return 0;
1079
1080 fail:
1081         bus_free(b);
1082         return r;
1083 }
1084
1085 void sd_bus_close(sd_bus *bus) {
1086         if (!bus)
1087                 return;
1088         if (bus->state == BUS_CLOSED)
1089                 return;
1090         if (bus_pid_changed(bus))
1091                 return;
1092
1093         bus->state = BUS_CLOSED;
1094
1095         if (!bus->is_kernel)
1096                 bus_close_fds(bus);
1097
1098         /* We'll leave the fd open in case this is a kernel bus, since
1099          * there might still be memblocks around that reference this
1100          * bus, and they might need to invoke the
1101          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1102          * freed. */
1103 }
1104
1105 sd_bus *sd_bus_ref(sd_bus *bus) {
1106         if (!bus)
1107                 return NULL;
1108
1109         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1110
1111         return bus;
1112 }
1113
1114 sd_bus *sd_bus_unref(sd_bus *bus) {
1115         if (!bus)
1116                 return NULL;
1117
1118         if (REFCNT_DEC(bus->n_ref) <= 0)
1119                 bus_free(bus);
1120
1121         return NULL;
1122 }
1123
1124 int sd_bus_is_open(sd_bus *bus) {
1125         if (!bus)
1126                 return -EINVAL;
1127         if (bus_pid_changed(bus))
1128                 return -ECHILD;
1129
1130         return BUS_IS_OPEN(bus->state);
1131 }
1132
1133 int sd_bus_can_send(sd_bus *bus, char type) {
1134         int r;
1135
1136         if (!bus)
1137                 return -EINVAL;
1138         if (bus->state == BUS_UNSET)
1139                 return -ENOTCONN;
1140         if (bus_pid_changed(bus))
1141                 return -ECHILD;
1142
1143         if (type == SD_BUS_TYPE_UNIX_FD) {
1144                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1145                         return 0;
1146
1147                 r = bus_ensure_running(bus);
1148                 if (r < 0)
1149                         return r;
1150
1151                 return bus->can_fds;
1152         }
1153
1154         return bus_type_is_valid(type);
1155 }
1156
1157 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1158         int r;
1159
1160         if (!bus)
1161                 return -EINVAL;
1162         if (!server_id)
1163                 return -EINVAL;
1164         if (bus_pid_changed(bus))
1165                 return -ECHILD;
1166
1167         r = bus_ensure_running(bus);
1168         if (r < 0)
1169                 return r;
1170
1171         *server_id = bus->server_id;
1172         return 0;
1173 }
1174
1175 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1176         assert(m);
1177
1178         if (m->header->version > b->message_version)
1179                 return -EPERM;
1180
1181         if (m->sealed)
1182                 return 0;
1183
1184         return bus_message_seal(m, ++b->serial);
1185 }
1186
1187 static int dispatch_wqueue(sd_bus *bus) {
1188         int r, ret = 0;
1189
1190         assert(bus);
1191         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1192
1193         while (bus->wqueue_size > 0) {
1194
1195                 if (bus->is_kernel)
1196                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1197                 else
1198                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1199
1200                 if (r < 0) {
1201                         sd_bus_close(bus);
1202                         return r;
1203                 } else if (r == 0)
1204                         /* Didn't do anything this time */
1205                         return ret;
1206                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1207                         /* Fully written. Let's drop the entry from
1208                          * the queue.
1209                          *
1210                          * This isn't particularly optimized, but
1211                          * well, this is supposed to be our worst-case
1212                          * buffer only, and the socket buffer is
1213                          * supposed to be our primary buffer, and if
1214                          * it got full, then all bets are off
1215                          * anyway. */
1216
1217                         sd_bus_message_unref(bus->wqueue[0]);
1218                         bus->wqueue_size --;
1219                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1220                         bus->windex = 0;
1221
1222                         ret = 1;
1223                 }
1224         }
1225
1226         return ret;
1227 }
1228
1229 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1230         sd_bus_message *z = NULL;
1231         int r, ret = 0;
1232
1233         assert(bus);
1234         assert(m);
1235         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1236
1237         if (bus->rqueue_size > 0) {
1238                 /* Dispatch a queued message */
1239
1240                 *m = bus->rqueue[0];
1241                 bus->rqueue_size --;
1242                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1243                 return 1;
1244         }
1245
1246         /* Try to read a new message */
1247         do {
1248                 if (bus->is_kernel)
1249                         r = bus_kernel_read_message(bus, &z);
1250                 else
1251                         r = bus_socket_read_message(bus, &z);
1252
1253                 if (r < 0) {
1254                         sd_bus_close(bus);
1255                         return r;
1256                 }
1257                 if (r == 0)
1258                         return ret;
1259
1260                 ret = 1;
1261         } while (!z);
1262
1263         *m = z;
1264         return ret;
1265 }
1266
1267 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1268         int r;
1269
1270         if (!bus)
1271                 return -EINVAL;
1272         if (!BUS_IS_OPEN(bus->state))
1273                 return -ENOTCONN;
1274         if (!m)
1275                 return -EINVAL;
1276         if (bus_pid_changed(bus))
1277                 return -ECHILD;
1278
1279         if (m->n_fds > 0) {
1280                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1281                 if (r < 0)
1282                         return r;
1283                 if (r == 0)
1284                         return -ENOTSUP;
1285         }
1286
1287         /* If the serial number isn't kept, then we know that no reply
1288          * is expected */
1289         if (!serial && !m->sealed)
1290                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1291
1292         r = bus_seal_message(bus, m);
1293         if (r < 0)
1294                 return r;
1295
1296         /* If this is a reply and no reply was requested, then let's
1297          * suppress this, if we can */
1298         if (m->dont_send && !serial)
1299                 return 0;
1300
1301         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1302                 size_t idx = 0;
1303
1304                 if (bus->is_kernel)
1305                         r = bus_kernel_write_message(bus, m);
1306                 else
1307                         r = bus_socket_write_message(bus, m, &idx);
1308
1309                 if (r < 0) {
1310                         sd_bus_close(bus);
1311                         return r;
1312                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1313                         /* Wasn't fully written. So let's remember how
1314                          * much was written. Note that the first entry
1315                          * of the wqueue array is always allocated so
1316                          * that we always can remember how much was
1317                          * written. */
1318                         bus->wqueue[0] = sd_bus_message_ref(m);
1319                         bus->wqueue_size = 1;
1320                         bus->windex = idx;
1321                 }
1322         } else {
1323                 sd_bus_message **q;
1324
1325                 /* Just append it to the queue. */
1326
1327                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1328                         return -ENOBUFS;
1329
1330                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1331                 if (!q)
1332                         return -ENOMEM;
1333
1334                 bus->wqueue = q;
1335                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1336         }
1337
1338         if (serial)
1339                 *serial = BUS_MESSAGE_SERIAL(m);
1340
1341         return 0;
1342 }
1343
1344 static usec_t calc_elapse(uint64_t usec) {
1345         if (usec == (uint64_t) -1)
1346                 return 0;
1347
1348         if (usec == 0)
1349                 usec = BUS_DEFAULT_TIMEOUT;
1350
1351         return now(CLOCK_MONOTONIC) + usec;
1352 }
1353
1354 static int timeout_compare(const void *a, const void *b) {
1355         const struct reply_callback *x = a, *y = b;
1356
1357         if (x->timeout != 0 && y->timeout == 0)
1358                 return -1;
1359
1360         if (x->timeout == 0 && y->timeout != 0)
1361                 return 1;
1362
1363         if (x->timeout < y->timeout)
1364                 return -1;
1365
1366         if (x->timeout > y->timeout)
1367                 return 1;
1368
1369         return 0;
1370 }
1371
1372 int sd_bus_send_with_reply(
1373                 sd_bus *bus,
1374                 sd_bus_message *m,
1375                 sd_bus_message_handler_t callback,
1376                 void *userdata,
1377                 uint64_t usec,
1378                 uint64_t *serial) {
1379
1380         struct reply_callback *c;
1381         int r;
1382
1383         if (!bus)
1384                 return -EINVAL;
1385         if (!BUS_IS_OPEN(bus->state))
1386                 return -ENOTCONN;
1387         if (!m)
1388                 return -EINVAL;
1389         if (!callback)
1390                 return -EINVAL;
1391         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1392                 return -EINVAL;
1393         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1394                 return -EINVAL;
1395         if (bus_pid_changed(bus))
1396                 return -ECHILD;
1397
1398         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1399         if (r < 0)
1400                 return r;
1401
1402         if (usec != (uint64_t) -1) {
1403                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1404                 if (r < 0)
1405                         return r;
1406         }
1407
1408         r = bus_seal_message(bus, m);
1409         if (r < 0)
1410                 return r;
1411
1412         c = new0(struct reply_callback, 1);
1413         if (!c)
1414                 return -ENOMEM;
1415
1416         c->callback = callback;
1417         c->userdata = userdata;
1418         c->serial = BUS_MESSAGE_SERIAL(m);
1419         c->timeout = calc_elapse(usec);
1420
1421         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1422         if (r < 0) {
1423                 free(c);
1424                 return r;
1425         }
1426
1427         if (c->timeout != 0) {
1428                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1429                 if (r < 0) {
1430                         c->timeout = 0;
1431                         sd_bus_send_with_reply_cancel(bus, c->serial);
1432                         return r;
1433                 }
1434         }
1435
1436         r = sd_bus_send(bus, m, serial);
1437         if (r < 0) {
1438                 sd_bus_send_with_reply_cancel(bus, c->serial);
1439                 return r;
1440         }
1441
1442         return r;
1443 }
1444
1445 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1446         struct reply_callback *c;
1447
1448         if (!bus)
1449                 return -EINVAL;
1450         if (serial == 0)
1451                 return -EINVAL;
1452         if (bus_pid_changed(bus))
1453                 return -ECHILD;
1454
1455         c = hashmap_remove(bus->reply_callbacks, &serial);
1456         if (!c)
1457                 return 0;
1458
1459         if (c->timeout != 0)
1460                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1461
1462         free(c);
1463         return 1;
1464 }
1465
1466 int bus_ensure_running(sd_bus *bus) {
1467         int r;
1468
1469         assert(bus);
1470
1471         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1472                 return -ENOTCONN;
1473         if (bus->state == BUS_RUNNING)
1474                 return 1;
1475
1476         for (;;) {
1477                 r = sd_bus_process(bus, NULL);
1478                 if (r < 0)
1479                         return r;
1480                 if (bus->state == BUS_RUNNING)
1481                         return 1;
1482                 if (r > 0)
1483                         continue;
1484
1485                 r = sd_bus_wait(bus, (uint64_t) -1);
1486                 if (r < 0)
1487                         return r;
1488         }
1489 }
1490
1491 int sd_bus_send_with_reply_and_block(
1492                 sd_bus *bus,
1493                 sd_bus_message *m,
1494                 uint64_t usec,
1495                 sd_bus_error *error,
1496                 sd_bus_message **reply) {
1497
1498         int r;
1499         usec_t timeout;
1500         uint64_t serial;
1501         bool room = false;
1502
1503         if (!bus)
1504                 return -EINVAL;
1505         if (!BUS_IS_OPEN(bus->state))
1506                 return -ENOTCONN;
1507         if (!m)
1508                 return -EINVAL;
1509         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1510                 return -EINVAL;
1511         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1512                 return -EINVAL;
1513         if (bus_error_is_dirty(error))
1514                 return -EINVAL;
1515         if (bus_pid_changed(bus))
1516                 return -ECHILD;
1517
1518         r = bus_ensure_running(bus);
1519         if (r < 0)
1520                 return r;
1521
1522         r = sd_bus_send(bus, m, &serial);
1523         if (r < 0)
1524                 return r;
1525
1526         timeout = calc_elapse(usec);
1527
1528         for (;;) {
1529                 usec_t left;
1530                 sd_bus_message *incoming = NULL;
1531
1532                 if (!room) {
1533                         sd_bus_message **q;
1534
1535                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1536                                 return -ENOBUFS;
1537
1538                         /* Make sure there's room for queuing this
1539                          * locally, before we read the message */
1540
1541                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1542                         if (!q)
1543                                 return -ENOMEM;
1544
1545                         bus->rqueue = q;
1546                         room = true;
1547                 }
1548
1549                 if (bus->is_kernel)
1550                         r = bus_kernel_read_message(bus, &incoming);
1551                 else
1552                         r = bus_socket_read_message(bus, &incoming);
1553                 if (r < 0)
1554                         return r;
1555                 if (incoming) {
1556
1557                         if (incoming->reply_serial == serial) {
1558                                 /* Found a match! */
1559
1560                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1561
1562                                         if (reply)
1563                                                 *reply = incoming;
1564                                         else
1565                                                 sd_bus_message_unref(incoming);
1566
1567                                         return 0;
1568                                 }
1569
1570                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1571                                         int k;
1572
1573                                         r = sd_bus_error_copy(error, &incoming->error);
1574                                         if (r < 0) {
1575                                                 sd_bus_message_unref(incoming);
1576                                                 return r;
1577                                         }
1578
1579                                         k = bus_error_to_errno(&incoming->error);
1580                                         sd_bus_message_unref(incoming);
1581                                         return k;
1582                                 }
1583
1584                                 sd_bus_message_unref(incoming);
1585                                 return -EIO;
1586                         }
1587
1588                         /* There's already guaranteed to be room for
1589                          * this, so need to resize things here */
1590                         bus->rqueue[bus->rqueue_size ++] = incoming;
1591                         room = false;
1592
1593                         /* Try to read more, right-away */
1594                         continue;
1595                 }
1596                 if (r != 0)
1597                         continue;
1598
1599                 if (timeout > 0) {
1600                         usec_t n;
1601
1602                         n = now(CLOCK_MONOTONIC);
1603                         if (n >= timeout)
1604                                 return -ETIMEDOUT;
1605
1606                         left = timeout - n;
1607                 } else
1608                         left = (uint64_t) -1;
1609
1610                 r = bus_poll(bus, true, left);
1611                 if (r < 0)
1612                         return r;
1613
1614                 r = dispatch_wqueue(bus);
1615                 if (r < 0)
1616                         return r;
1617         }
1618 }
1619
1620 int sd_bus_get_fd(sd_bus *bus) {
1621         if (!bus)
1622                 return -EINVAL;
1623         if (!BUS_IS_OPEN(bus->state))
1624                 return -ENOTCONN;
1625         if (bus->input_fd != bus->output_fd)
1626                 return -EPERM;
1627         if (bus_pid_changed(bus))
1628                 return -ECHILD;
1629
1630         return bus->input_fd;
1631 }
1632
1633 int sd_bus_get_events(sd_bus *bus) {
1634         int flags = 0;
1635
1636         if (!bus)
1637                 return -EINVAL;
1638         if (!BUS_IS_OPEN(bus->state))
1639                 return -ENOTCONN;
1640         if (bus_pid_changed(bus))
1641                 return -ECHILD;
1642
1643         if (bus->state == BUS_OPENING)
1644                 flags |= POLLOUT;
1645         else if (bus->state == BUS_AUTHENTICATING) {
1646
1647                 if (bus_socket_auth_needs_write(bus))
1648                         flags |= POLLOUT;
1649
1650                 flags |= POLLIN;
1651
1652         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1653                 if (bus->rqueue_size <= 0)
1654                         flags |= POLLIN;
1655                 if (bus->wqueue_size > 0)
1656                         flags |= POLLOUT;
1657         }
1658
1659         return flags;
1660 }
1661
1662 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1663         struct reply_callback *c;
1664
1665         if (!bus)
1666                 return -EINVAL;
1667         if (!timeout_usec)
1668                 return -EINVAL;
1669         if (!BUS_IS_OPEN(bus->state))
1670                 return -ENOTCONN;
1671         if (bus_pid_changed(bus))
1672                 return -ECHILD;
1673
1674         if (bus->state == BUS_AUTHENTICATING) {
1675                 *timeout_usec = bus->auth_timeout;
1676                 return 1;
1677         }
1678
1679         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1680                 *timeout_usec = (uint64_t) -1;
1681                 return 0;
1682         }
1683
1684         c = prioq_peek(bus->reply_callbacks_prioq);
1685         if (!c) {
1686                 *timeout_usec = (uint64_t) -1;
1687                 return 0;
1688         }
1689
1690         *timeout_usec = c->timeout;
1691         return 1;
1692 }
1693
1694 static int process_timeout(sd_bus *bus) {
1695         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1696         struct reply_callback *c;
1697         usec_t n;
1698         int r;
1699
1700         assert(bus);
1701
1702         c = prioq_peek(bus->reply_callbacks_prioq);
1703         if (!c)
1704                 return 0;
1705
1706         n = now(CLOCK_MONOTONIC);
1707         if (c->timeout > n)
1708                 return 0;
1709
1710         r = bus_message_new_synthetic_error(
1711                         bus,
1712                         c->serial,
1713                         &SD_BUS_ERROR_MAKE("org.freedesktop.DBus.Error.Timeout", "Timed out"),
1714                         &m);
1715         if (r < 0)
1716                 return r;
1717
1718         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1719         hashmap_remove(bus->reply_callbacks, &c->serial);
1720
1721         r = c->callback(bus, m, c->userdata);
1722         free(c);
1723
1724         return r < 0 ? r : 1;
1725 }
1726
1727 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1728         assert(bus);
1729         assert(m);
1730
1731         if (bus->state != BUS_HELLO)
1732                 return 0;
1733
1734         /* Let's make sure the first message on the bus is the HELLO
1735          * reply. But note that we don't actually parse the message
1736          * here (we leave that to the usual handling), we just verify
1737          * we don't let any earlier msg through. */
1738
1739         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1740             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1741                 return -EIO;
1742
1743         if (m->reply_serial != bus->hello_serial)
1744                 return -EIO;
1745
1746         return 0;
1747 }
1748
1749 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1750         struct reply_callback *c;
1751         int r;
1752
1753         assert(bus);
1754         assert(m);
1755
1756         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1757             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1758                 return 0;
1759
1760         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1761         if (!c)
1762                 return 0;
1763
1764         if (c->timeout != 0)
1765                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1766
1767         r = sd_bus_message_rewind(m, true);
1768         if (r < 0)
1769                 return r;
1770
1771         r = c->callback(bus, m, c->userdata);
1772         free(c);
1773
1774         return r;
1775 }
1776
1777 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1778         struct filter_callback *l;
1779         int r;
1780
1781         assert(bus);
1782         assert(m);
1783
1784         do {
1785                 bus->filter_callbacks_modified = false;
1786
1787                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1788
1789                         if (bus->filter_callbacks_modified)
1790                                 break;
1791
1792                         /* Don't run this more than once per iteration */
1793                         if (l->last_iteration == bus->iteration_counter)
1794                                 continue;
1795
1796                         l->last_iteration = bus->iteration_counter;
1797
1798                         r = sd_bus_message_rewind(m, true);
1799                         if (r < 0)
1800                                 return r;
1801
1802                         r = l->callback(bus, m, l->userdata);
1803                         if (r != 0)
1804                                 return r;
1805
1806                 }
1807
1808         } while (bus->filter_callbacks_modified);
1809
1810         return 0;
1811 }
1812
1813 static int process_match(sd_bus *bus, sd_bus_message *m) {
1814         int r;
1815
1816         assert(bus);
1817         assert(m);
1818
1819         do {
1820                 bus->match_callbacks_modified = false;
1821
1822                 r = bus_match_run(bus, &bus->match_callbacks, m);
1823                 if (r != 0)
1824                         return r;
1825
1826         } while (bus->match_callbacks_modified);
1827
1828         return 0;
1829 }
1830
1831 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1832         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1833         int r;
1834
1835         assert(bus);
1836         assert(m);
1837
1838         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1839                 return 0;
1840
1841         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1842                 return 0;
1843
1844         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1845                 return 1;
1846
1847         if (streq_ptr(m->member, "Ping"))
1848                 r = sd_bus_message_new_method_return(bus, m, &reply);
1849         else if (streq_ptr(m->member, "GetMachineId")) {
1850                 sd_id128_t id;
1851                 char sid[33];
1852
1853                 r = sd_id128_get_machine(&id);
1854                 if (r < 0)
1855                         return r;
1856
1857                 r = sd_bus_message_new_method_return(bus, m, &reply);
1858                 if (r < 0)
1859                         return r;
1860
1861                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1862         } else {
1863                 r = sd_bus_message_new_method_errorf(
1864                                 bus, m, &reply,
1865                                 "org.freedesktop.DBus.Error.UnknownMethod",
1866                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1867         }
1868
1869         if (r < 0)
1870                 return r;
1871
1872         r = sd_bus_send(bus, reply, NULL);
1873         if (r < 0)
1874                 return r;
1875
1876         return 1;
1877 }
1878
1879 static int process_message(sd_bus *bus, sd_bus_message *m) {
1880         int r;
1881
1882         assert(bus);
1883         assert(m);
1884
1885         bus->iteration_counter++;
1886
1887         r = process_hello(bus, m);
1888         if (r != 0)
1889                 return r;
1890
1891         r = process_reply(bus, m);
1892         if (r != 0)
1893                 return r;
1894
1895         r = process_filter(bus, m);
1896         if (r != 0)
1897                 return r;
1898
1899         r = process_match(bus, m);
1900         if (r != 0)
1901                 return r;
1902
1903         r = process_builtin(bus, m);
1904         if (r != 0)
1905                 return r;
1906
1907         return bus_process_object(bus, m);
1908 }
1909
1910 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1911         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1912         int r;
1913
1914         assert(bus);
1915         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1916
1917         r = process_timeout(bus);
1918         if (r != 0)
1919                 goto null_message;
1920
1921         r = dispatch_wqueue(bus);
1922         if (r != 0)
1923                 goto null_message;
1924
1925         r = dispatch_rqueue(bus, &m);
1926         if (r < 0)
1927                 return r;
1928         if (!m)
1929                 goto null_message;
1930
1931         r = process_message(bus, m);
1932         if (r != 0)
1933                 goto null_message;
1934
1935         if (ret) {
1936                 r = sd_bus_message_rewind(m, true);
1937                 if (r < 0)
1938                         return r;
1939
1940                 *ret = m;
1941                 m = NULL;
1942                 return 1;
1943         }
1944
1945         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1946
1947                 r = sd_bus_reply_method_errorf(
1948                                 bus, m,
1949                                 "org.freedesktop.DBus.Error.UnknownObject",
1950                                 "Unknown object '%s'.", m->path);
1951                 if (r < 0)
1952                         return r;
1953         }
1954
1955         return 1;
1956
1957 null_message:
1958         if (r >= 0 && ret)
1959                 *ret = NULL;
1960
1961         return r;
1962 }
1963
1964 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1965         int r;
1966
1967         /* Returns 0 when we didn't do anything. This should cause the
1968          * caller to invoke sd_bus_wait() before returning the next
1969          * time. Returns > 0 when we did something, which possibly
1970          * means *ret is filled in with an unprocessed message. */
1971
1972         if (!bus)
1973                 return -EINVAL;
1974         if (bus_pid_changed(bus))
1975                 return -ECHILD;
1976
1977         /* We don't allow recursively invoking sd_bus_process(). */
1978         if (bus->processing)
1979                 return -EBUSY;
1980
1981         switch (bus->state) {
1982
1983         case BUS_UNSET:
1984         case BUS_CLOSED:
1985                 return -ENOTCONN;
1986
1987         case BUS_OPENING:
1988                 r = bus_socket_process_opening(bus);
1989                 if (r < 0)
1990                         return r;
1991                 if (ret)
1992                         *ret = NULL;
1993                 return r;
1994
1995         case BUS_AUTHENTICATING:
1996
1997                 r = bus_socket_process_authenticating(bus);
1998                 if (r < 0)
1999                         return r;
2000                 if (ret)
2001                         *ret = NULL;
2002                 return r;
2003
2004         case BUS_RUNNING:
2005         case BUS_HELLO:
2006
2007                 bus->processing = true;
2008                 r = process_running(bus, ret);
2009                 bus->processing = false;
2010
2011                 return r;
2012         }
2013
2014         assert_not_reached("Unknown state");
2015 }
2016
2017 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
2018         struct pollfd p[2] = {};
2019         int r, e, n;
2020         struct timespec ts;
2021         usec_t until, m;
2022
2023         assert(bus);
2024
2025         if (!BUS_IS_OPEN(bus->state))
2026                 return -ENOTCONN;
2027
2028         e = sd_bus_get_events(bus);
2029         if (e < 0)
2030                 return e;
2031
2032         if (need_more)
2033                 e |= POLLIN;
2034
2035         r = sd_bus_get_timeout(bus, &until);
2036         if (r < 0)
2037                 return r;
2038         if (r == 0)
2039                 m = (uint64_t) -1;
2040         else {
2041                 usec_t nw;
2042                 nw = now(CLOCK_MONOTONIC);
2043                 m = until > nw ? until - nw : 0;
2044         }
2045
2046         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2047                 m = timeout_usec;
2048
2049         p[0].fd = bus->input_fd;
2050         if (bus->output_fd == bus->input_fd) {
2051                 p[0].events = e;
2052                 n = 1;
2053         } else {
2054                 p[0].events = e & POLLIN;
2055                 p[1].fd = bus->output_fd;
2056                 p[1].events = e & POLLOUT;
2057                 n = 2;
2058         }
2059
2060         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2061         if (r < 0)
2062                 return -errno;
2063
2064         return r > 0 ? 1 : 0;
2065 }
2066
2067 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2068
2069         if (!bus)
2070                 return -EINVAL;
2071         if (!BUS_IS_OPEN(bus->state))
2072                 return -ENOTCONN;
2073         if (bus_pid_changed(bus))
2074                 return -ECHILD;
2075
2076         if (bus->rqueue_size > 0)
2077                 return 0;
2078
2079         return bus_poll(bus, false, timeout_usec);
2080 }
2081
2082 int sd_bus_flush(sd_bus *bus) {
2083         int r;
2084
2085         if (!bus)
2086                 return -EINVAL;
2087         if (!BUS_IS_OPEN(bus->state))
2088                 return -ENOTCONN;
2089         if (bus_pid_changed(bus))
2090                 return -ECHILD;
2091
2092         r = bus_ensure_running(bus);
2093         if (r < 0)
2094                 return r;
2095
2096         if (bus->wqueue_size <= 0)
2097                 return 0;
2098
2099         for (;;) {
2100                 r = dispatch_wqueue(bus);
2101                 if (r < 0)
2102                         return r;
2103
2104                 if (bus->wqueue_size <= 0)
2105                         return 0;
2106
2107                 r = bus_poll(bus, false, (uint64_t) -1);
2108                 if (r < 0)
2109                         return r;
2110         }
2111 }
2112
2113 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2114         struct filter_callback *f;
2115
2116         if (!bus)
2117                 return -EINVAL;
2118         if (!callback)
2119                 return -EINVAL;
2120         if (bus_pid_changed(bus))
2121                 return -ECHILD;
2122
2123         f = new0(struct filter_callback, 1);
2124         if (!f)
2125                 return -ENOMEM;
2126         f->callback = callback;
2127         f->userdata = userdata;
2128
2129         bus->filter_callbacks_modified = true;
2130         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2131         return 0;
2132 }
2133
2134 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2135         struct filter_callback *f;
2136
2137         if (!bus)
2138                 return -EINVAL;
2139         if (!callback)
2140                 return -EINVAL;
2141         if (bus_pid_changed(bus))
2142                 return -ECHILD;
2143
2144         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2145                 if (f->callback == callback && f->userdata == userdata) {
2146                         bus->filter_callbacks_modified = true;
2147                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2148                         free(f);
2149                         return 1;
2150                 }
2151         }
2152
2153         return 0;
2154 }
2155
2156 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2157         struct bus_match_component *components = NULL;
2158         unsigned n_components = 0;
2159         uint64_t cookie = 0;
2160         int r = 0;
2161
2162         if (!bus)
2163                 return -EINVAL;
2164         if (!match)
2165                 return -EINVAL;
2166         if (bus_pid_changed(bus))
2167                 return -ECHILD;
2168
2169         r = bus_match_parse(match, &components, &n_components);
2170         if (r < 0)
2171                 goto finish;
2172
2173         if (bus->bus_client) {
2174                 cookie = ++bus->match_cookie;
2175
2176                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2177                 if (r < 0)
2178                         goto finish;
2179         }
2180
2181         bus->match_callbacks_modified = true;
2182         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2183         if (r < 0) {
2184                 if (bus->bus_client)
2185                         bus_remove_match_internal(bus, match, cookie);
2186         }
2187
2188 finish:
2189         bus_match_parse_free(components, n_components);
2190         return r;
2191 }
2192
2193 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2194         struct bus_match_component *components = NULL;
2195         unsigned n_components = 0;
2196         int r = 0, q = 0;
2197         uint64_t cookie = 0;
2198
2199         if (!bus)
2200                 return -EINVAL;
2201         if (!match)
2202                 return -EINVAL;
2203         if (bus_pid_changed(bus))
2204                 return -ECHILD;
2205
2206         r = bus_match_parse(match, &components, &n_components);
2207         if (r < 0)
2208                 return r;
2209
2210         bus->match_callbacks_modified = true;
2211         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2212
2213         if (bus->bus_client)
2214                 q = bus_remove_match_internal(bus, match, cookie);
2215
2216         bus_match_parse_free(components, n_components);
2217
2218         return r < 0 ? r : q;
2219 }
2220
2221 bool bus_pid_changed(sd_bus *bus) {
2222         assert(bus);
2223
2224         /* We don't support people creating a bus connection and
2225          * keeping it around over a fork(). Let's complain. */
2226
2227         return bus->original_pid != getpid();
2228 }