chiark / gitweb /
bus: move ssh support into public API of libsystem-bus
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29 #include <sys/mman.h>
30 #include <pthread.h>
31
32 #include "util.h"
33 #include "macro.h"
34 #include "strv.h"
35 #include "set.h"
36 #include "missing.h"
37
38 #include "sd-bus.h"
39 #include "bus-internal.h"
40 #include "bus-message.h"
41 #include "bus-type.h"
42 #include "bus-socket.h"
43 #include "bus-kernel.h"
44 #include "bus-control.h"
45 #include "bus-introspect.h"
46 #include "bus-signature.h"
47 #include "bus-objects.h"
48 #include "bus-util.h"
49
50 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
51
52 static void bus_close_fds(sd_bus *b) {
53         assert(b);
54
55         if (b->input_fd >= 0)
56                 close_nointr_nofail(b->input_fd);
57
58         if (b->output_fd >= 0 && b->output_fd != b->input_fd)
59                 close_nointr_nofail(b->output_fd);
60
61         b->input_fd = b->output_fd = -1;
62 }
63
64 static void bus_node_destroy(sd_bus *b, struct node *n) {
65         struct node_callback *c;
66         struct node_vtable *v;
67         struct node_enumerator *e;
68
69         assert(b);
70
71         if (!n)
72                 return;
73
74         while (n->child)
75                 bus_node_destroy(b, n->child);
76
77         while ((c = n->callbacks)) {
78                 LIST_REMOVE(callbacks, n->callbacks, c);
79                 free(c);
80         }
81
82         while ((v = n->vtables)) {
83                 LIST_REMOVE(vtables, n->vtables, v);
84                 free(v->interface);
85                 free(v);
86         }
87
88         while ((e = n->enumerators)) {
89                 LIST_REMOVE(enumerators, n->enumerators, e);
90                 free(e);
91         }
92
93         if (n->parent)
94                 LIST_REMOVE(siblings, n->parent->child, n);
95
96         assert_se(hashmap_remove(b->nodes, n->path) == n);
97         free(n->path);
98         free(n);
99 }
100
101 static void bus_free(sd_bus *b) {
102         struct filter_callback *f;
103         struct node *n;
104         unsigned i;
105
106         assert(b);
107
108         sd_bus_detach_event(b);
109
110         bus_close_fds(b);
111
112         if (b->kdbus_buffer)
113                 munmap(b->kdbus_buffer, KDBUS_POOL_SIZE);
114
115         free(b->rbuffer);
116         free(b->unique_name);
117         free(b->auth_buffer);
118         free(b->address);
119         free(b->kernel);
120
121         free(b->exec_path);
122         strv_free(b->exec_argv);
123
124         close_many(b->fds, b->n_fds);
125         free(b->fds);
126
127         for (i = 0; i < b->rqueue_size; i++)
128                 sd_bus_message_unref(b->rqueue[i]);
129         free(b->rqueue);
130
131         for (i = 0; i < b->wqueue_size; i++)
132                 sd_bus_message_unref(b->wqueue[i]);
133         free(b->wqueue);
134
135         hashmap_free_free(b->reply_callbacks);
136         prioq_free(b->reply_callbacks_prioq);
137
138         while ((f = b->filter_callbacks)) {
139                 LIST_REMOVE(callbacks, b->filter_callbacks, f);
140                 free(f);
141         }
142
143         bus_match_free(&b->match_callbacks);
144
145         hashmap_free_free(b->vtable_methods);
146         hashmap_free_free(b->vtable_properties);
147
148         while ((n = hashmap_first(b->nodes)))
149                 bus_node_destroy(b, n);
150
151         hashmap_free(b->nodes);
152
153         bus_kernel_flush_memfd(b);
154
155         assert_se(pthread_mutex_destroy(&b->memfd_cache_mutex) == 0);
156
157         free(b);
158 }
159
160 int sd_bus_new(sd_bus **ret) {
161         sd_bus *r;
162
163         assert_return(ret, -EINVAL);
164
165         r = new0(sd_bus, 1);
166         if (!r)
167                 return -ENOMEM;
168
169         r->n_ref = REFCNT_INIT;
170         r->input_fd = r->output_fd = -1;
171         r->message_version = 1;
172         r->hello_flags |= KDBUS_HELLO_ACCEPT_FD;
173         r->original_pid = getpid();
174
175         assert_se(pthread_mutex_init(&r->memfd_cache_mutex, NULL) == 0);
176
177         /* We guarantee that wqueue always has space for at least one
178          * entry */
179         r->wqueue = new(sd_bus_message*, 1);
180         if (!r->wqueue) {
181                 free(r);
182                 return -ENOMEM;
183         }
184
185         *ret = r;
186         return 0;
187 }
188
189 int sd_bus_set_address(sd_bus *bus, const char *address) {
190         char *a;
191
192         assert_return(bus, -EINVAL);
193         assert_return(bus->state == BUS_UNSET, -EPERM);
194         assert_return(address, -EINVAL);
195         assert_return(!bus_pid_changed(bus), -ECHILD);
196
197         a = strdup(address);
198         if (!a)
199                 return -ENOMEM;
200
201         free(bus->address);
202         bus->address = a;
203
204         return 0;
205 }
206
207 int sd_bus_set_fd(sd_bus *bus, int input_fd, int output_fd) {
208         assert_return(bus, -EINVAL);
209         assert_return(bus->state == BUS_UNSET, -EPERM);
210         assert_return(input_fd >= 0, -EINVAL);
211         assert_return(output_fd >= 0, -EINVAL);
212         assert_return(!bus_pid_changed(bus), -ECHILD);
213
214         bus->input_fd = input_fd;
215         bus->output_fd = output_fd;
216         return 0;
217 }
218
219 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
220         char *p, **a;
221
222         assert_return(bus, -EINVAL);
223         assert_return(bus->state == BUS_UNSET, -EPERM);
224         assert_return(path, -EINVAL);
225         assert_return(!strv_isempty(argv), -EINVAL);
226         assert_return(!bus_pid_changed(bus), -ECHILD);
227
228         p = strdup(path);
229         if (!p)
230                 return -ENOMEM;
231
232         a = strv_copy(argv);
233         if (!a) {
234                 free(p);
235                 return -ENOMEM;
236         }
237
238         free(bus->exec_path);
239         strv_free(bus->exec_argv);
240
241         bus->exec_path = p;
242         bus->exec_argv = a;
243
244         return 0;
245 }
246
247 int sd_bus_set_bus_client(sd_bus *bus, int b) {
248         assert_return(bus, -EINVAL);
249         assert_return(bus->state == BUS_UNSET, -EPERM);
250         assert_return(!bus_pid_changed(bus), -ECHILD);
251
252         bus->bus_client = !!b;
253         return 0;
254 }
255
256 int sd_bus_negotiate_fds(sd_bus *bus, int b) {
257         assert_return(bus, -EINVAL);
258         assert_return(bus->state == BUS_UNSET, -EPERM);
259         assert_return(!bus_pid_changed(bus), -ECHILD);
260
261         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ACCEPT_FD, b);
262         return 0;
263 }
264
265 int sd_bus_negotiate_attach_comm(sd_bus *bus, int b) {
266         assert_return(bus, -EINVAL);
267         assert_return(bus->state == BUS_UNSET, -EPERM);
268         assert_return(!bus_pid_changed(bus), -ECHILD);
269
270         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_COMM, b);
271         return 0;
272 }
273
274 int sd_bus_negotiate_attach_exe(sd_bus *bus, int b) {
275         assert_return(bus, -EINVAL);
276         assert_return(bus->state == BUS_UNSET, -EPERM);
277         assert_return(!bus_pid_changed(bus), -ECHILD);
278
279         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_EXE, b);
280         return 0;
281 }
282
283 int sd_bus_negotiate_attach_cmdline(sd_bus *bus, int b) {
284         assert_return(bus, -EINVAL);
285         assert_return(bus->state == BUS_UNSET, -EPERM);
286         assert_return(!bus_pid_changed(bus), -ECHILD);
287
288         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CMDLINE, b);
289         return 0;
290 }
291
292 int sd_bus_negotiate_attach_cgroup(sd_bus *bus, int b) {
293         assert_return(bus, -EINVAL);
294         assert_return(bus->state == BUS_UNSET, -EPERM);
295         assert_return(!bus_pid_changed(bus), -ECHILD);
296
297         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CGROUP, b);
298         return 0;
299 }
300
301 int sd_bus_negotiate_attach_caps(sd_bus *bus, int b) {
302         assert_return(bus, -EINVAL);
303         assert_return(bus->state == BUS_UNSET, -EPERM);
304         assert_return(!bus_pid_changed(bus), -ECHILD);
305
306         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_CAPS, b);
307         return 0;
308 }
309
310 int sd_bus_negotiate_attach_selinux_context(sd_bus *bus, int b) {
311         assert_return(bus, -EINVAL);
312         assert_return(bus->state == BUS_UNSET, -EPERM);
313         assert_return(!bus_pid_changed(bus), -ECHILD);
314
315         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_SECLABEL, b);
316         return 0;
317 }
318
319 int sd_bus_negotiate_attach_audit(sd_bus *bus, int b) {
320         assert_return(bus, -EINVAL);
321         assert_return(bus->state == BUS_UNSET, -EPERM);
322         assert_return(!bus_pid_changed(bus), -ECHILD);
323
324         SET_FLAG(bus->hello_flags, KDBUS_HELLO_ATTACH_AUDIT, b);
325         return 0;
326 }
327
328 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server_id) {
329         assert_return(bus, -EINVAL);
330         assert_return(b || sd_id128_equal(server_id, SD_ID128_NULL), -EINVAL);
331         assert_return(bus->state == BUS_UNSET, -EPERM);
332         assert_return(!bus_pid_changed(bus), -ECHILD);
333
334         bus->is_server = !!b;
335         bus->server_id = server_id;
336         return 0;
337 }
338
339 int sd_bus_set_anonymous(sd_bus *bus, int b) {
340         assert_return(bus, -EINVAL);
341         assert_return(bus->state == BUS_UNSET, -EPERM);
342         assert_return(!bus_pid_changed(bus), -ECHILD);
343
344         bus->anonymous_auth = !!b;
345         return 0;
346 }
347
348 static int hello_callback(sd_bus *bus, sd_bus_message *reply, void *userdata) {
349         const char *s;
350         int r;
351
352         assert(bus);
353         assert(bus->state == BUS_HELLO);
354         assert(reply);
355
356         r = sd_bus_message_get_errno(reply);
357         if (r < 0)
358                 return r;
359         if (r > 0)
360                 return -r;
361
362         r = sd_bus_message_read(reply, "s", &s);
363         if (r < 0)
364                 return r;
365
366         if (!service_name_is_valid(s) || s[0] != ':')
367                 return -EBADMSG;
368
369         bus->unique_name = strdup(s);
370         if (!bus->unique_name)
371                 return -ENOMEM;
372
373         bus->state = BUS_RUNNING;
374
375         return 1;
376 }
377
378 static int bus_send_hello(sd_bus *bus) {
379         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
380         int r;
381
382         assert(bus);
383
384         if (!bus->bus_client || bus->is_kernel)
385                 return 0;
386
387         r = sd_bus_message_new_method_call(
388                         bus,
389                         "org.freedesktop.DBus",
390                         "/",
391                         "org.freedesktop.DBus",
392                         "Hello",
393                         &m);
394         if (r < 0)
395                 return r;
396
397         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
398 }
399
400 int bus_start_running(sd_bus *bus) {
401         assert(bus);
402
403         if (bus->bus_client && !bus->is_kernel) {
404                 bus->state = BUS_HELLO;
405                 return 1;
406         }
407
408         bus->state = BUS_RUNNING;
409         return 1;
410 }
411
412 static int parse_address_key(const char **p, const char *key, char **value) {
413         size_t l, n = 0;
414         const char *a;
415         char *r = NULL;
416
417         assert(p);
418         assert(*p);
419         assert(value);
420
421         if (key) {
422                 l = strlen(key);
423                 if (strncmp(*p, key, l) != 0)
424                         return 0;
425
426                 if ((*p)[l] != '=')
427                         return 0;
428
429                 if (*value)
430                         return -EINVAL;
431
432                 a = *p + l + 1;
433         } else
434                 a = *p;
435
436         while (*a != ';' && *a != ',' && *a != 0) {
437                 char c, *t;
438
439                 if (*a == '%') {
440                         int x, y;
441
442                         x = unhexchar(a[1]);
443                         if (x < 0) {
444                                 free(r);
445                                 return x;
446                         }
447
448                         y = unhexchar(a[2]);
449                         if (y < 0) {
450                                 free(r);
451                                 return y;
452                         }
453
454                         c = (char) ((x << 4) | y);
455                         a += 3;
456                 } else {
457                         c = *a;
458                         a++;
459                 }
460
461                 t = realloc(r, n + 2);
462                 if (!t) {
463                         free(r);
464                         return -ENOMEM;
465                 }
466
467                 r = t;
468                 r[n++] = c;
469         }
470
471         if (!r) {
472                 r = strdup("");
473                 if (!r)
474                         return -ENOMEM;
475         } else
476                 r[n] = 0;
477
478         if (*a == ',')
479                 a++;
480
481         *p = a;
482
483         free(*value);
484         *value = r;
485
486         return 1;
487 }
488
489 static void skip_address_key(const char **p) {
490         assert(p);
491         assert(*p);
492
493         *p += strcspn(*p, ",");
494
495         if (**p == ',')
496                 (*p) ++;
497 }
498
499 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
500         _cleanup_free_ char *path = NULL, *abstract = NULL;
501         size_t l;
502         int r;
503
504         assert(b);
505         assert(p);
506         assert(*p);
507         assert(guid);
508
509         while (**p != 0 && **p != ';') {
510                 r = parse_address_key(p, "guid", guid);
511                 if (r < 0)
512                         return r;
513                 else if (r > 0)
514                         continue;
515
516                 r = parse_address_key(p, "path", &path);
517                 if (r < 0)
518                         return r;
519                 else if (r > 0)
520                         continue;
521
522                 r = parse_address_key(p, "abstract", &abstract);
523                 if (r < 0)
524                         return r;
525                 else if (r > 0)
526                         continue;
527
528                 skip_address_key(p);
529         }
530
531         if (!path && !abstract)
532                 return -EINVAL;
533
534         if (path && abstract)
535                 return -EINVAL;
536
537         if (path) {
538                 l = strlen(path);
539                 if (l > sizeof(b->sockaddr.un.sun_path))
540                         return -E2BIG;
541
542                 b->sockaddr.un.sun_family = AF_UNIX;
543                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
544                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
545         } else if (abstract) {
546                 l = strlen(abstract);
547                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
548                         return -E2BIG;
549
550                 b->sockaddr.un.sun_family = AF_UNIX;
551                 b->sockaddr.un.sun_path[0] = 0;
552                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
553                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
554         }
555
556         return 0;
557 }
558
559 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
560         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
561         int r;
562         struct addrinfo *result, hints = {
563                 .ai_socktype = SOCK_STREAM,
564                 .ai_flags = AI_ADDRCONFIG,
565         };
566
567         assert(b);
568         assert(p);
569         assert(*p);
570         assert(guid);
571
572         while (**p != 0 && **p != ';') {
573                 r = parse_address_key(p, "guid", guid);
574                 if (r < 0)
575                         return r;
576                 else if (r > 0)
577                         continue;
578
579                 r = parse_address_key(p, "host", &host);
580                 if (r < 0)
581                         return r;
582                 else if (r > 0)
583                         continue;
584
585                 r = parse_address_key(p, "port", &port);
586                 if (r < 0)
587                         return r;
588                 else if (r > 0)
589                         continue;
590
591                 r = parse_address_key(p, "family", &family);
592                 if (r < 0)
593                         return r;
594                 else if (r > 0)
595                         continue;
596
597                 skip_address_key(p);
598         }
599
600         if (!host || !port)
601                 return -EINVAL;
602
603         if (family) {
604                 if (streq(family, "ipv4"))
605                         hints.ai_family = AF_INET;
606                 else if (streq(family, "ipv6"))
607                         hints.ai_family = AF_INET6;
608                 else
609                         return -EINVAL;
610         }
611
612         r = getaddrinfo(host, port, &hints, &result);
613         if (r == EAI_SYSTEM)
614                 return -errno;
615         else if (r != 0)
616                 return -EADDRNOTAVAIL;
617
618         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
619         b->sockaddr_size = result->ai_addrlen;
620
621         freeaddrinfo(result);
622
623         return 0;
624 }
625
626 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
627         char *path = NULL;
628         unsigned n_argv = 0, j;
629         char **argv = NULL;
630         int r;
631
632         assert(b);
633         assert(p);
634         assert(*p);
635         assert(guid);
636
637         while (**p != 0 && **p != ';') {
638                 r = parse_address_key(p, "guid", guid);
639                 if (r < 0)
640                         goto fail;
641                 else if (r > 0)
642                         continue;
643
644                 r = parse_address_key(p, "path", &path);
645                 if (r < 0)
646                         goto fail;
647                 else if (r > 0)
648                         continue;
649
650                 if (startswith(*p, "argv")) {
651                         unsigned ul;
652
653                         errno = 0;
654                         ul = strtoul(*p + 4, (char**) p, 10);
655                         if (errno > 0 || **p != '=' || ul > 256) {
656                                 r = -EINVAL;
657                                 goto fail;
658                         }
659
660                         (*p) ++;
661
662                         if (ul >= n_argv) {
663                                 char **x;
664
665                                 x = realloc(argv, sizeof(char*) * (ul + 2));
666                                 if (!x) {
667                                         r = -ENOMEM;
668                                         goto fail;
669                                 }
670
671                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
672
673                                 argv = x;
674                                 n_argv = ul + 1;
675                         }
676
677                         r = parse_address_key(p, NULL, argv + ul);
678                         if (r < 0)
679                                 goto fail;
680
681                         continue;
682                 }
683
684                 skip_address_key(p);
685         }
686
687         if (!path) {
688                 r = -EINVAL;
689                 goto fail;
690         }
691
692         /* Make sure there are no holes in the array, with the
693          * exception of argv[0] */
694         for (j = 1; j < n_argv; j++)
695                 if (!argv[j]) {
696                         r = -EINVAL;
697                         goto fail;
698                 }
699
700         if (argv && argv[0] == NULL) {
701                 argv[0] = strdup(path);
702                 if (!argv[0]) {
703                         r = -ENOMEM;
704                         goto fail;
705                 }
706         }
707
708         b->exec_path = path;
709         b->exec_argv = argv;
710         return 0;
711
712 fail:
713         for (j = 0; j < n_argv; j++)
714                 free(argv[j]);
715
716         free(argv);
717         free(path);
718         return r;
719 }
720
721 static int parse_kernel_address(sd_bus *b, const char **p, char **guid) {
722         _cleanup_free_ char *path = NULL;
723         int r;
724
725         assert(b);
726         assert(p);
727         assert(*p);
728         assert(guid);
729
730         while (**p != 0 && **p != ';') {
731                 r = parse_address_key(p, "guid", guid);
732                 if (r < 0)
733                         return r;
734                 else if (r > 0)
735                         continue;
736
737                 r = parse_address_key(p, "path", &path);
738                 if (r < 0)
739                         return r;
740                 else if (r > 0)
741                         continue;
742
743                 skip_address_key(p);
744         }
745
746         if (!path)
747                 return -EINVAL;
748
749         free(b->kernel);
750         b->kernel = path;
751         path = NULL;
752
753         return 0;
754 }
755
756 static void bus_reset_parsed_address(sd_bus *b) {
757         assert(b);
758
759         zero(b->sockaddr);
760         b->sockaddr_size = 0;
761         strv_free(b->exec_argv);
762         free(b->exec_path);
763         b->exec_path = NULL;
764         b->exec_argv = NULL;
765         b->server_id = SD_ID128_NULL;
766         free(b->kernel);
767         b->kernel = NULL;
768 }
769
770 static int bus_parse_next_address(sd_bus *b) {
771         _cleanup_free_ char *guid = NULL;
772         const char *a;
773         int r;
774
775         assert(b);
776
777         if (!b->address)
778                 return 0;
779         if (b->address[b->address_index] == 0)
780                 return 0;
781
782         bus_reset_parsed_address(b);
783
784         a = b->address + b->address_index;
785
786         while (*a != 0) {
787
788                 if (*a == ';') {
789                         a++;
790                         continue;
791                 }
792
793                 if (startswith(a, "unix:")) {
794                         a += 5;
795
796                         r = parse_unix_address(b, &a, &guid);
797                         if (r < 0)
798                                 return r;
799                         break;
800
801                 } else if (startswith(a, "tcp:")) {
802
803                         a += 4;
804                         r = parse_tcp_address(b, &a, &guid);
805                         if (r < 0)
806                                 return r;
807
808                         break;
809
810                 } else if (startswith(a, "unixexec:")) {
811
812                         a += 9;
813                         r = parse_exec_address(b, &a, &guid);
814                         if (r < 0)
815                                 return r;
816
817                         break;
818
819                 } else if (startswith(a, "kernel:")) {
820
821                         a += 7;
822                         r = parse_kernel_address(b, &a, &guid);
823                         if (r < 0)
824                                 return r;
825
826                         break;
827                 }
828
829                 a = strchr(a, ';');
830                 if (!a)
831                         return 0;
832         }
833
834         if (guid) {
835                 r = sd_id128_from_string(guid, &b->server_id);
836                 if (r < 0)
837                         return r;
838         }
839
840         b->address_index = a - b->address;
841         return 1;
842 }
843
844 static int bus_start_address(sd_bus *b) {
845         int r;
846
847         assert(b);
848
849         for (;;) {
850                 sd_bus_close(b);
851
852                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
853
854                         r = bus_socket_connect(b);
855                         if (r >= 0)
856                                 return r;
857
858                         b->last_connect_error = -r;
859
860                 } else if (b->exec_path) {
861
862                         r = bus_socket_exec(b);
863                         if (r >= 0)
864                                 return r;
865
866                         b->last_connect_error = -r;
867                 } else if (b->kernel) {
868
869                         r = bus_kernel_connect(b);
870                         if (r >= 0)
871                                 return r;
872
873                         b->last_connect_error = -r;
874                 }
875
876                 r = bus_parse_next_address(b);
877                 if (r < 0)
878                         return r;
879                 if (r == 0)
880                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
881         }
882 }
883
884 int bus_next_address(sd_bus *b) {
885         assert(b);
886
887         bus_reset_parsed_address(b);
888         return bus_start_address(b);
889 }
890
891 static int bus_start_fd(sd_bus *b) {
892         struct stat st;
893         int r;
894
895         assert(b);
896         assert(b->input_fd >= 0);
897         assert(b->output_fd >= 0);
898
899         r = fd_nonblock(b->input_fd, true);
900         if (r < 0)
901                 return r;
902
903         r = fd_cloexec(b->input_fd, true);
904         if (r < 0)
905                 return r;
906
907         if (b->input_fd != b->output_fd) {
908                 r = fd_nonblock(b->output_fd, true);
909                 if (r < 0)
910                         return r;
911
912                 r = fd_cloexec(b->output_fd, true);
913                 if (r < 0)
914                         return r;
915         }
916
917         if (fstat(b->input_fd, &st) < 0)
918                 return -errno;
919
920         if (S_ISCHR(b->input_fd))
921                 return bus_kernel_take_fd(b);
922         else
923                 return bus_socket_take_fd(b);
924 }
925
926 int sd_bus_start(sd_bus *bus) {
927         int r;
928
929         assert_return(bus, -EINVAL);
930         assert_return(bus->state == BUS_UNSET, -EPERM);
931         assert_return(!bus_pid_changed(bus), -ECHILD);
932
933         bus->state = BUS_OPENING;
934
935         if (bus->is_server && bus->bus_client)
936                 return -EINVAL;
937
938         if (bus->input_fd >= 0)
939                 r = bus_start_fd(bus);
940         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path || bus->kernel)
941                 r = bus_start_address(bus);
942         else
943                 return -EINVAL;
944
945         if (r < 0)
946                 return r;
947
948         return bus_send_hello(bus);
949 }
950
951 int sd_bus_open_system(sd_bus **ret) {
952         const char *e;
953         sd_bus *b;
954         int r;
955
956         assert_return(ret, -EINVAL);
957
958         r = sd_bus_new(&b);
959         if (r < 0)
960                 return r;
961
962         e = secure_getenv("DBUS_SYSTEM_BUS_ADDRESS");
963         if (e) {
964                 r = sd_bus_set_address(b, e);
965                 if (r < 0)
966                         goto fail;
967         } else {
968                 b->sockaddr.un.sun_family = AF_UNIX;
969                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
970                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
971         }
972
973         b->bus_client = true;
974
975         r = sd_bus_start(b);
976         if (r < 0)
977                 goto fail;
978
979         *ret = b;
980         return 0;
981
982 fail:
983         bus_free(b);
984         return r;
985 }
986
987 int sd_bus_open_user(sd_bus **ret) {
988         const char *e;
989         sd_bus *b;
990         size_t l;
991         int r;
992
993         assert_return(ret, -EINVAL);
994
995         r = sd_bus_new(&b);
996         if (r < 0)
997                 return r;
998
999         e = secure_getenv("DBUS_SESSION_BUS_ADDRESS");
1000         if (e) {
1001                 r = sd_bus_set_address(b, e);
1002                 if (r < 0)
1003                         goto fail;
1004         } else {
1005                 e = secure_getenv("XDG_RUNTIME_DIR");
1006                 if (!e) {
1007                         r = -ENOENT;
1008                         goto fail;
1009                 }
1010
1011                 l = strlen(e);
1012                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
1013                         r = -E2BIG;
1014                         goto fail;
1015                 }
1016
1017                 b->sockaddr.un.sun_family = AF_UNIX;
1018                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
1019                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
1020         }
1021
1022         b->bus_client = true;
1023
1024         r = sd_bus_start(b);
1025         if (r < 0)
1026                 goto fail;
1027
1028         *ret = b;
1029         return 0;
1030
1031 fail:
1032         bus_free(b);
1033         return r;
1034 }
1035
1036 int sd_bus_open_system_remote(const char *host, sd_bus **ret) {
1037         _cleanup_free_ char *e = NULL;
1038         char *p = NULL;
1039         sd_bus *bus;
1040         int r;
1041
1042         assert_return(host, -EINVAL);
1043         assert_return(ret, -EINVAL);
1044
1045         e = bus_address_escape(host);
1046         if (!e)
1047                 return -ENOMEM;
1048
1049         p = strjoin("unixexec:path=ssh,argv1=-xT,argv2=", e, ",argv3=systemd-stdio-bridge", NULL);
1050         if (!p)
1051                 return -ENOMEM;
1052
1053         r = sd_bus_new(&bus);
1054         if (r < 0) {
1055                 free(p);
1056                 return r;
1057         }
1058
1059         bus->address = p;
1060         bus->bus_client = true;
1061
1062         r = sd_bus_start(bus);
1063         if (r < 0) {
1064                 bus_free(bus);
1065                 return r;
1066         }
1067
1068         *ret = bus;
1069         return 0;
1070 }
1071
1072 void sd_bus_close(sd_bus *bus) {
1073         if (!bus)
1074                 return;
1075         if (bus->state == BUS_CLOSED)
1076                 return;
1077         if (bus_pid_changed(bus))
1078                 return;
1079
1080         bus->state = BUS_CLOSED;
1081
1082         sd_bus_detach_event(bus);
1083
1084         if (!bus->is_kernel)
1085                 bus_close_fds(bus);
1086
1087         /* We'll leave the fd open in case this is a kernel bus, since
1088          * there might still be memblocks around that reference this
1089          * bus, and they might need to invoke the
1090          * KDBUS_CMD_MSG_RELEASE ioctl on the fd when they are
1091          * freed. */
1092 }
1093
1094 sd_bus *sd_bus_ref(sd_bus *bus) {
1095         if (!bus)
1096                 return NULL;
1097
1098         assert_se(REFCNT_INC(bus->n_ref) >= 2);
1099
1100         return bus;
1101 }
1102
1103 sd_bus *sd_bus_unref(sd_bus *bus) {
1104         if (!bus)
1105                 return NULL;
1106
1107         if (REFCNT_DEC(bus->n_ref) <= 0)
1108                 bus_free(bus);
1109
1110         return NULL;
1111 }
1112
1113 int sd_bus_is_open(sd_bus *bus) {
1114
1115         assert_return(bus, -EINVAL);
1116         assert_return(!bus_pid_changed(bus), -ECHILD);
1117
1118         return BUS_IS_OPEN(bus->state);
1119 }
1120
1121 int sd_bus_can_send(sd_bus *bus, char type) {
1122         int r;
1123
1124         assert_return(bus, -EINVAL);
1125         assert_return(bus->state != BUS_UNSET, -ENOTCONN);
1126         assert_return(!bus_pid_changed(bus), -ECHILD);
1127
1128         if (type == SD_BUS_TYPE_UNIX_FD) {
1129                 if (!(bus->hello_flags & KDBUS_HELLO_ACCEPT_FD))
1130                         return 0;
1131
1132                 r = bus_ensure_running(bus);
1133                 if (r < 0)
1134                         return r;
1135
1136                 return bus->can_fds;
1137         }
1138
1139         return bus_type_is_valid(type);
1140 }
1141
1142 int sd_bus_get_server_id(sd_bus *bus, sd_id128_t *server_id) {
1143         int r;
1144
1145         assert_return(bus, -EINVAL);
1146         assert_return(server_id, -EINVAL);
1147         assert_return(!bus_pid_changed(bus), -ECHILD);
1148
1149         r = bus_ensure_running(bus);
1150         if (r < 0)
1151                 return r;
1152
1153         *server_id = bus->server_id;
1154         return 0;
1155 }
1156
1157 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
1158         assert(m);
1159
1160         if (m->header->version > b->message_version)
1161                 return -EPERM;
1162
1163         if (m->sealed)
1164                 return 0;
1165
1166         return bus_message_seal(m, ++b->serial);
1167 }
1168
1169 static int dispatch_wqueue(sd_bus *bus) {
1170         int r, ret = 0;
1171
1172         assert(bus);
1173         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1174
1175         while (bus->wqueue_size > 0) {
1176
1177                 if (bus->is_kernel)
1178                         r = bus_kernel_write_message(bus, bus->wqueue[0]);
1179                 else
1180                         r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
1181
1182                 if (r < 0) {
1183                         sd_bus_close(bus);
1184                         return r;
1185                 } else if (r == 0)
1186                         /* Didn't do anything this time */
1187                         return ret;
1188                 else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
1189                         /* Fully written. Let's drop the entry from
1190                          * the queue.
1191                          *
1192                          * This isn't particularly optimized, but
1193                          * well, this is supposed to be our worst-case
1194                          * buffer only, and the socket buffer is
1195                          * supposed to be our primary buffer, and if
1196                          * it got full, then all bets are off
1197                          * anyway. */
1198
1199                         sd_bus_message_unref(bus->wqueue[0]);
1200                         bus->wqueue_size --;
1201                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1202                         bus->windex = 0;
1203
1204                         ret = 1;
1205                 }
1206         }
1207
1208         return ret;
1209 }
1210
1211 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1212         sd_bus_message *z = NULL;
1213         int r, ret = 0;
1214
1215         assert(bus);
1216         assert(m);
1217         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1218
1219         if (bus->rqueue_size > 0) {
1220                 /* Dispatch a queued message */
1221
1222                 *m = bus->rqueue[0];
1223                 bus->rqueue_size --;
1224                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1225                 return 1;
1226         }
1227
1228         /* Try to read a new message */
1229         do {
1230                 if (bus->is_kernel)
1231                         r = bus_kernel_read_message(bus, &z);
1232                 else
1233                         r = bus_socket_read_message(bus, &z);
1234
1235                 if (r < 0) {
1236                         sd_bus_close(bus);
1237                         return r;
1238                 }
1239                 if (r == 0)
1240                         return ret;
1241
1242                 ret = 1;
1243         } while (!z);
1244
1245         *m = z;
1246         return ret;
1247 }
1248
1249 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1250         int r;
1251
1252         assert_return(bus, -EINVAL);
1253         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1254         assert_return(m, -EINVAL);
1255         assert_return(!bus_pid_changed(bus), -ECHILD);
1256
1257         if (m->n_fds > 0) {
1258                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1259                 if (r < 0)
1260                         return r;
1261                 if (r == 0)
1262                         return -ENOTSUP;
1263         }
1264
1265         /* If the serial number isn't kept, then we know that no reply
1266          * is expected */
1267         if (!serial && !m->sealed)
1268                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1269
1270         r = bus_seal_message(bus, m);
1271         if (r < 0)
1272                 return r;
1273
1274         /* If this is a reply and no reply was requested, then let's
1275          * suppress this, if we can */
1276         if (m->dont_send && !serial)
1277                 return 1;
1278
1279         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1280                 size_t idx = 0;
1281
1282                 if (bus->is_kernel)
1283                         r = bus_kernel_write_message(bus, m);
1284                 else
1285                         r = bus_socket_write_message(bus, m, &idx);
1286
1287                 if (r < 0) {
1288                         sd_bus_close(bus);
1289                         return r;
1290                 } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m))  {
1291                         /* Wasn't fully written. So let's remember how
1292                          * much was written. Note that the first entry
1293                          * of the wqueue array is always allocated so
1294                          * that we always can remember how much was
1295                          * written. */
1296                         bus->wqueue[0] = sd_bus_message_ref(m);
1297                         bus->wqueue_size = 1;
1298                         bus->windex = idx;
1299                 }
1300         } else {
1301                 sd_bus_message **q;
1302
1303                 /* Just append it to the queue. */
1304
1305                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1306                         return -ENOBUFS;
1307
1308                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1309                 if (!q)
1310                         return -ENOMEM;
1311
1312                 bus->wqueue = q;
1313                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1314         }
1315
1316         if (serial)
1317                 *serial = BUS_MESSAGE_SERIAL(m);
1318
1319         return 1;
1320 }
1321
1322 static usec_t calc_elapse(uint64_t usec) {
1323         if (usec == (uint64_t) -1)
1324                 return 0;
1325
1326         if (usec == 0)
1327                 usec = BUS_DEFAULT_TIMEOUT;
1328
1329         return now(CLOCK_MONOTONIC) + usec;
1330 }
1331
1332 static int timeout_compare(const void *a, const void *b) {
1333         const struct reply_callback *x = a, *y = b;
1334
1335         if (x->timeout != 0 && y->timeout == 0)
1336                 return -1;
1337
1338         if (x->timeout == 0 && y->timeout != 0)
1339                 return 1;
1340
1341         if (x->timeout < y->timeout)
1342                 return -1;
1343
1344         if (x->timeout > y->timeout)
1345                 return 1;
1346
1347         return 0;
1348 }
1349
1350 int sd_bus_send_with_reply(
1351                 sd_bus *bus,
1352                 sd_bus_message *m,
1353                 sd_bus_message_handler_t callback,
1354                 void *userdata,
1355                 uint64_t usec,
1356                 uint64_t *serial) {
1357
1358         struct reply_callback *c;
1359         int r;
1360
1361         assert_return(bus, -EINVAL);
1362         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1363         assert_return(m, -EINVAL);
1364         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1365         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1366         assert_return(callback, -EINVAL);
1367         assert_return(!bus_pid_changed(bus), -ECHILD);
1368
1369         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1370         if (r < 0)
1371                 return r;
1372
1373         if (usec != (uint64_t) -1) {
1374                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1375                 if (r < 0)
1376                         return r;
1377         }
1378
1379         r = bus_seal_message(bus, m);
1380         if (r < 0)
1381                 return r;
1382
1383         c = new0(struct reply_callback, 1);
1384         if (!c)
1385                 return -ENOMEM;
1386
1387         c->callback = callback;
1388         c->userdata = userdata;
1389         c->serial = BUS_MESSAGE_SERIAL(m);
1390         c->timeout = calc_elapse(usec);
1391
1392         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1393         if (r < 0) {
1394                 free(c);
1395                 return r;
1396         }
1397
1398         if (c->timeout != 0) {
1399                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1400                 if (r < 0) {
1401                         c->timeout = 0;
1402                         sd_bus_send_with_reply_cancel(bus, c->serial);
1403                         return r;
1404                 }
1405         }
1406
1407         r = sd_bus_send(bus, m, serial);
1408         if (r < 0) {
1409                 sd_bus_send_with_reply_cancel(bus, c->serial);
1410                 return r;
1411         }
1412
1413         return r;
1414 }
1415
1416 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1417         struct reply_callback *c;
1418
1419         assert_return(bus, -EINVAL);
1420         assert_return(serial != 0, -EINVAL);
1421         assert_return(!bus_pid_changed(bus), -ECHILD);
1422
1423         c = hashmap_remove(bus->reply_callbacks, &serial);
1424         if (!c)
1425                 return 0;
1426
1427         if (c->timeout != 0)
1428                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1429
1430         free(c);
1431         return 1;
1432 }
1433
1434 int bus_ensure_running(sd_bus *bus) {
1435         int r;
1436
1437         assert(bus);
1438
1439         if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
1440                 return -ENOTCONN;
1441         if (bus->state == BUS_RUNNING)
1442                 return 1;
1443
1444         for (;;) {
1445                 r = sd_bus_process(bus, NULL);
1446                 if (r < 0)
1447                         return r;
1448                 if (bus->state == BUS_RUNNING)
1449                         return 1;
1450                 if (r > 0)
1451                         continue;
1452
1453                 r = sd_bus_wait(bus, (uint64_t) -1);
1454                 if (r < 0)
1455                         return r;
1456         }
1457 }
1458
1459 int sd_bus_send_with_reply_and_block(
1460                 sd_bus *bus,
1461                 sd_bus_message *m,
1462                 uint64_t usec,
1463                 sd_bus_error *error,
1464                 sd_bus_message **reply) {
1465
1466         int r;
1467         usec_t timeout;
1468         uint64_t serial;
1469         bool room = false;
1470
1471         assert_return(bus, -EINVAL);
1472         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1473         assert_return(m, -EINVAL);
1474         assert_return(m->header->type == SD_BUS_MESSAGE_METHOD_CALL, -EINVAL);
1475         assert_return(!(m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED), -EINVAL);
1476         assert_return(!bus_error_is_dirty(error), -EINVAL);
1477         assert_return(!bus_pid_changed(bus), -ECHILD);
1478
1479         r = bus_ensure_running(bus);
1480         if (r < 0)
1481                 return r;
1482
1483         r = sd_bus_send(bus, m, &serial);
1484         if (r < 0)
1485                 return r;
1486
1487         timeout = calc_elapse(usec);
1488
1489         for (;;) {
1490                 usec_t left;
1491                 sd_bus_message *incoming = NULL;
1492
1493                 if (!room) {
1494                         sd_bus_message **q;
1495
1496                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1497                                 return -ENOBUFS;
1498
1499                         /* Make sure there's room for queuing this
1500                          * locally, before we read the message */
1501
1502                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1503                         if (!q)
1504                                 return -ENOMEM;
1505
1506                         bus->rqueue = q;
1507                         room = true;
1508                 }
1509
1510                 if (bus->is_kernel)
1511                         r = bus_kernel_read_message(bus, &incoming);
1512                 else
1513                         r = bus_socket_read_message(bus, &incoming);
1514                 if (r < 0)
1515                         return r;
1516                 if (incoming) {
1517
1518                         if (incoming->reply_serial == serial) {
1519                                 /* Found a match! */
1520
1521                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_RETURN) {
1522
1523                                         if (reply)
1524                                                 *reply = incoming;
1525                                         else
1526                                                 sd_bus_message_unref(incoming);
1527
1528                                         return 1;
1529                                 }
1530
1531                                 if (incoming->header->type == SD_BUS_MESSAGE_METHOD_ERROR) {
1532                                         int k;
1533
1534                                         r = sd_bus_error_copy(error, &incoming->error);
1535                                         if (r < 0) {
1536                                                 sd_bus_message_unref(incoming);
1537                                                 return r;
1538                                         }
1539
1540                                         k = sd_bus_error_get_errno(&incoming->error);
1541                                         sd_bus_message_unref(incoming);
1542                                         return -k;
1543                                 }
1544
1545                                 sd_bus_message_unref(incoming);
1546                                 return -EIO;
1547                         }
1548
1549                         /* There's already guaranteed to be room for
1550                          * this, so need to resize things here */
1551                         bus->rqueue[bus->rqueue_size ++] = incoming;
1552                         room = false;
1553
1554                         /* Try to read more, right-away */
1555                         continue;
1556                 }
1557                 if (r != 0)
1558                         continue;
1559
1560                 if (timeout > 0) {
1561                         usec_t n;
1562
1563                         n = now(CLOCK_MONOTONIC);
1564                         if (n >= timeout)
1565                                 return -ETIMEDOUT;
1566
1567                         left = timeout - n;
1568                 } else
1569                         left = (uint64_t) -1;
1570
1571                 r = bus_poll(bus, true, left);
1572                 if (r < 0)
1573                         return r;
1574
1575                 r = dispatch_wqueue(bus);
1576                 if (r < 0)
1577                         return r;
1578         }
1579 }
1580
1581 int sd_bus_get_fd(sd_bus *bus) {
1582
1583         assert_return(bus, -EINVAL);
1584         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1585         assert_return(bus->input_fd == bus->output_fd, -EPERM);
1586         assert_return(!bus_pid_changed(bus), -ECHILD);
1587
1588         return bus->input_fd;
1589 }
1590
1591 int sd_bus_get_events(sd_bus *bus) {
1592         int flags = 0;
1593
1594         assert_return(bus, -EINVAL);
1595         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1596         assert_return(!bus_pid_changed(bus), -ECHILD);
1597
1598         if (bus->state == BUS_OPENING)
1599                 flags |= POLLOUT;
1600         else if (bus->state == BUS_AUTHENTICATING) {
1601
1602                 if (bus_socket_auth_needs_write(bus))
1603                         flags |= POLLOUT;
1604
1605                 flags |= POLLIN;
1606
1607         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1608                 if (bus->rqueue_size <= 0)
1609                         flags |= POLLIN;
1610                 if (bus->wqueue_size > 0)
1611                         flags |= POLLOUT;
1612         }
1613
1614         return flags;
1615 }
1616
1617 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1618         struct reply_callback *c;
1619
1620         assert_return(bus, -EINVAL);
1621         assert_return(timeout_usec, -EINVAL);
1622         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1623         assert_return(!bus_pid_changed(bus), -ECHILD);
1624
1625         if (bus->state == BUS_AUTHENTICATING) {
1626                 *timeout_usec = bus->auth_timeout;
1627                 return 1;
1628         }
1629
1630         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO) {
1631                 *timeout_usec = (uint64_t) -1;
1632                 return 0;
1633         }
1634
1635         if (bus->rqueue_size > 0) {
1636                 *timeout_usec = 0;
1637                 return 1;
1638         }
1639
1640         c = prioq_peek(bus->reply_callbacks_prioq);
1641         if (!c) {
1642                 *timeout_usec = (uint64_t) -1;
1643                 return 0;
1644         }
1645
1646         *timeout_usec = c->timeout;
1647         return 1;
1648 }
1649
1650 static int process_timeout(sd_bus *bus) {
1651         _cleanup_bus_message_unref_ sd_bus_message* m = NULL;
1652         struct reply_callback *c;
1653         usec_t n;
1654         int r;
1655
1656         assert(bus);
1657
1658         c = prioq_peek(bus->reply_callbacks_prioq);
1659         if (!c)
1660                 return 0;
1661
1662         n = now(CLOCK_MONOTONIC);
1663         if (c->timeout > n)
1664                 return 0;
1665
1666         r = bus_message_new_synthetic_error(
1667                         bus,
1668                         c->serial,
1669                         &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Method call timed out"),
1670                         &m);
1671         if (r < 0)
1672                 return r;
1673
1674         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1675         hashmap_remove(bus->reply_callbacks, &c->serial);
1676
1677         r = c->callback(bus, m, c->userdata);
1678         free(c);
1679
1680         return r < 0 ? r : 1;
1681 }
1682
1683 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1684         assert(bus);
1685         assert(m);
1686
1687         if (bus->state != BUS_HELLO)
1688                 return 0;
1689
1690         /* Let's make sure the first message on the bus is the HELLO
1691          * reply. But note that we don't actually parse the message
1692          * here (we leave that to the usual handling), we just verify
1693          * we don't let any earlier msg through. */
1694
1695         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1696             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1697                 return -EIO;
1698
1699         if (m->reply_serial != bus->hello_serial)
1700                 return -EIO;
1701
1702         return 0;
1703 }
1704
1705 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1706         struct reply_callback *c;
1707         int r;
1708
1709         assert(bus);
1710         assert(m);
1711
1712         if (m->header->type != SD_BUS_MESSAGE_METHOD_RETURN &&
1713             m->header->type != SD_BUS_MESSAGE_METHOD_ERROR)
1714                 return 0;
1715
1716         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1717         if (!c)
1718                 return 0;
1719
1720         if (c->timeout != 0)
1721                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1722
1723         r = sd_bus_message_rewind(m, true);
1724         if (r < 0)
1725                 return r;
1726
1727         r = c->callback(bus, m, c->userdata);
1728         free(c);
1729
1730         return r;
1731 }
1732
1733 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1734         struct filter_callback *l;
1735         int r;
1736
1737         assert(bus);
1738         assert(m);
1739
1740         do {
1741                 bus->filter_callbacks_modified = false;
1742
1743                 LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1744
1745                         if (bus->filter_callbacks_modified)
1746                                 break;
1747
1748                         /* Don't run this more than once per iteration */
1749                         if (l->last_iteration == bus->iteration_counter)
1750                                 continue;
1751
1752                         l->last_iteration = bus->iteration_counter;
1753
1754                         r = sd_bus_message_rewind(m, true);
1755                         if (r < 0)
1756                                 return r;
1757
1758                         r = l->callback(bus, m, l->userdata);
1759                         if (r != 0)
1760                                 return r;
1761
1762                 }
1763
1764         } while (bus->filter_callbacks_modified);
1765
1766         return 0;
1767 }
1768
1769 static int process_match(sd_bus *bus, sd_bus_message *m) {
1770         int r;
1771
1772         assert(bus);
1773         assert(m);
1774
1775         do {
1776                 bus->match_callbacks_modified = false;
1777
1778                 r = bus_match_run(bus, &bus->match_callbacks, m);
1779                 if (r != 0)
1780                         return r;
1781
1782         } while (bus->match_callbacks_modified);
1783
1784         return 0;
1785 }
1786
1787 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1788         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1789         int r;
1790
1791         assert(bus);
1792         assert(m);
1793
1794         if (m->header->type != SD_BUS_MESSAGE_METHOD_CALL)
1795                 return 0;
1796
1797         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1798                 return 0;
1799
1800         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1801                 return 1;
1802
1803         if (streq_ptr(m->member, "Ping"))
1804                 r = sd_bus_message_new_method_return(bus, m, &reply);
1805         else if (streq_ptr(m->member, "GetMachineId")) {
1806                 sd_id128_t id;
1807                 char sid[33];
1808
1809                 r = sd_id128_get_machine(&id);
1810                 if (r < 0)
1811                         return r;
1812
1813                 r = sd_bus_message_new_method_return(bus, m, &reply);
1814                 if (r < 0)
1815                         return r;
1816
1817                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1818         } else {
1819                 r = sd_bus_message_new_method_errorf(
1820                                 bus, m, &reply,
1821                                 SD_BUS_ERROR_UNKNOWN_METHOD,
1822                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1823         }
1824
1825         if (r < 0)
1826                 return r;
1827
1828         r = sd_bus_send(bus, reply, NULL);
1829         if (r < 0)
1830                 return r;
1831
1832         return 1;
1833 }
1834
1835 static int process_message(sd_bus *bus, sd_bus_message *m) {
1836         int r;
1837
1838         assert(bus);
1839         assert(m);
1840
1841         bus->iteration_counter++;
1842
1843         log_debug("Got message sender=%s object=%s interface=%s member=%s",
1844                   strna(sd_bus_message_get_sender(m)),
1845                   strna(sd_bus_message_get_path(m)),
1846                   strna(sd_bus_message_get_interface(m)),
1847                   strna(sd_bus_message_get_member(m)));
1848
1849         r = process_hello(bus, m);
1850         if (r != 0)
1851                 return r;
1852
1853         r = process_reply(bus, m);
1854         if (r != 0)
1855                 return r;
1856
1857         r = process_filter(bus, m);
1858         if (r != 0)
1859                 return r;
1860
1861         r = process_match(bus, m);
1862         if (r != 0)
1863                 return r;
1864
1865         r = process_builtin(bus, m);
1866         if (r != 0)
1867                 return r;
1868
1869         return bus_process_object(bus, m);
1870 }
1871
1872 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1873         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1874         int r;
1875
1876         assert(bus);
1877         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1878
1879         r = process_timeout(bus);
1880         if (r != 0)
1881                 goto null_message;
1882
1883         r = dispatch_wqueue(bus);
1884         if (r != 0)
1885                 goto null_message;
1886
1887         r = dispatch_rqueue(bus, &m);
1888         if (r < 0)
1889                 return r;
1890         if (!m)
1891                 goto null_message;
1892
1893         r = process_message(bus, m);
1894         if (r != 0)
1895                 goto null_message;
1896
1897         if (ret) {
1898                 r = sd_bus_message_rewind(m, true);
1899                 if (r < 0)
1900                         return r;
1901
1902                 *ret = m;
1903                 m = NULL;
1904                 return 1;
1905         }
1906
1907         if (m->header->type == SD_BUS_MESSAGE_METHOD_CALL) {
1908
1909                 r = sd_bus_reply_method_errorf(
1910                                 bus, m,
1911                                 SD_BUS_ERROR_UNKNOWN_OBJECT,
1912                                 "Unknown object '%s'.", m->path);
1913                 if (r < 0)
1914                         return r;
1915         }
1916
1917         return 1;
1918
1919 null_message:
1920         if (r >= 0 && ret)
1921                 *ret = NULL;
1922
1923         return r;
1924 }
1925
1926 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1927         BUS_DONT_DESTROY(bus);
1928         int r;
1929
1930         /* Returns 0 when we didn't do anything. This should cause the
1931          * caller to invoke sd_bus_wait() before returning the next
1932          * time. Returns > 0 when we did something, which possibly
1933          * means *ret is filled in with an unprocessed message. */
1934
1935         assert_return(bus, -EINVAL);
1936         assert_return(!bus_pid_changed(bus), -ECHILD);
1937
1938         /* We don't allow recursively invoking sd_bus_process(). */
1939         assert_return(!bus->processing, -EBUSY);
1940
1941         switch (bus->state) {
1942
1943         case BUS_UNSET:
1944         case BUS_CLOSED:
1945                 return -ENOTCONN;
1946
1947         case BUS_OPENING:
1948                 r = bus_socket_process_opening(bus);
1949                 if (r < 0)
1950                         return r;
1951                 if (ret)
1952                         *ret = NULL;
1953                 return r;
1954
1955         case BUS_AUTHENTICATING:
1956
1957                 r = bus_socket_process_authenticating(bus);
1958                 if (r < 0)
1959                         return r;
1960                 if (ret)
1961                         *ret = NULL;
1962                 return r;
1963
1964         case BUS_RUNNING:
1965         case BUS_HELLO:
1966
1967                 bus->processing = true;
1968                 r = process_running(bus, ret);
1969                 bus->processing = false;
1970
1971                 return r;
1972         }
1973
1974         assert_not_reached("Unknown state");
1975 }
1976
1977 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1978         struct pollfd p[2] = {};
1979         int r, e, n;
1980         struct timespec ts;
1981         usec_t m = (usec_t) -1;
1982
1983         assert(bus);
1984         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
1985
1986         e = sd_bus_get_events(bus);
1987         if (e < 0)
1988                 return e;
1989
1990         if (need_more)
1991                 /* The caller really needs some more data, he doesn't
1992                  * care about what's already read, or any timeouts
1993                  * except its own.*/
1994                 e |= POLLIN;
1995         else {
1996                 usec_t until;
1997                 /* The caller wants to process if there's something to
1998                  * process, but doesn't care otherwise */
1999
2000                 r = sd_bus_get_timeout(bus, &until);
2001                 if (r < 0)
2002                         return r;
2003                 if (r > 0) {
2004                         usec_t nw;
2005                         nw = now(CLOCK_MONOTONIC);
2006                         m = until > nw ? until - nw : 0;
2007                 }
2008         }
2009
2010         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2011                 m = timeout_usec;
2012
2013         p[0].fd = bus->input_fd;
2014         if (bus->output_fd == bus->input_fd) {
2015                 p[0].events = e;
2016                 n = 1;
2017         } else {
2018                 p[0].events = e & POLLIN;
2019                 p[1].fd = bus->output_fd;
2020                 p[1].events = e & POLLOUT;
2021                 n = 2;
2022         }
2023
2024         r = ppoll(p, n, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2025         if (r < 0)
2026                 return -errno;
2027
2028         return r > 0 ? 1 : 0;
2029 }
2030
2031 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2032
2033         assert_return(bus, -EINVAL);
2034         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2035         assert_return(!bus_pid_changed(bus), -ECHILD);
2036
2037         if (bus->rqueue_size > 0)
2038                 return 0;
2039
2040         return bus_poll(bus, false, timeout_usec);
2041 }
2042
2043 int sd_bus_flush(sd_bus *bus) {
2044         int r;
2045
2046         assert_return(bus, -EINVAL);
2047         assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
2048         assert_return(!bus_pid_changed(bus), -ECHILD);
2049
2050         r = bus_ensure_running(bus);
2051         if (r < 0)
2052                 return r;
2053
2054         if (bus->wqueue_size <= 0)
2055                 return 0;
2056
2057         for (;;) {
2058                 r = dispatch_wqueue(bus);
2059                 if (r < 0)
2060                         return r;
2061
2062                 if (bus->wqueue_size <= 0)
2063                         return 0;
2064
2065                 r = bus_poll(bus, false, (uint64_t) -1);
2066                 if (r < 0)
2067                         return r;
2068         }
2069 }
2070
2071 int sd_bus_add_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2072         struct filter_callback *f;
2073
2074         assert_return(bus, -EINVAL);
2075         assert_return(callback, -EINVAL);
2076         assert_return(!bus_pid_changed(bus), -ECHILD);
2077
2078         f = new0(struct filter_callback, 1);
2079         if (!f)
2080                 return -ENOMEM;
2081         f->callback = callback;
2082         f->userdata = userdata;
2083
2084         bus->filter_callbacks_modified = true;
2085         LIST_PREPEND(callbacks, bus->filter_callbacks, f);
2086         return 0;
2087 }
2088
2089 int sd_bus_remove_filter(sd_bus *bus, sd_bus_message_handler_t callback, void *userdata) {
2090         struct filter_callback *f;
2091
2092         assert_return(bus, -EINVAL);
2093         assert_return(callback, -EINVAL);
2094         assert_return(!bus_pid_changed(bus), -ECHILD);
2095
2096         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2097                 if (f->callback == callback && f->userdata == userdata) {
2098                         bus->filter_callbacks_modified = true;
2099                         LIST_REMOVE(callbacks, bus->filter_callbacks, f);
2100                         free(f);
2101                         return 1;
2102                 }
2103         }
2104
2105         return 0;
2106 }
2107
2108 int sd_bus_add_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2109         struct bus_match_component *components = NULL;
2110         unsigned n_components = 0;
2111         uint64_t cookie = 0;
2112         int r = 0;
2113
2114         assert_return(bus, -EINVAL);
2115         assert_return(match, -EINVAL);
2116         assert_return(!bus_pid_changed(bus), -ECHILD);
2117
2118         r = bus_match_parse(match, &components, &n_components);
2119         if (r < 0)
2120                 goto finish;
2121
2122         if (bus->bus_client) {
2123                 cookie = ++bus->match_cookie;
2124
2125                 r = bus_add_match_internal(bus, match, components, n_components, cookie);
2126                 if (r < 0)
2127                         goto finish;
2128         }
2129
2130         bus->match_callbacks_modified = true;
2131         r = bus_match_add(&bus->match_callbacks, components, n_components, callback, userdata, cookie, NULL);
2132         if (r < 0) {
2133                 if (bus->bus_client)
2134                         bus_remove_match_internal(bus, match, cookie);
2135         }
2136
2137 finish:
2138         bus_match_parse_free(components, n_components);
2139         return r;
2140 }
2141
2142 int sd_bus_remove_match(sd_bus *bus, const char *match, sd_bus_message_handler_t callback, void *userdata) {
2143         struct bus_match_component *components = NULL;
2144         unsigned n_components = 0;
2145         int r = 0, q = 0;
2146         uint64_t cookie = 0;
2147
2148         assert_return(bus, -EINVAL);
2149         assert_return(match, -EINVAL);
2150         assert_return(!bus_pid_changed(bus), -ECHILD);
2151
2152         r = bus_match_parse(match, &components, &n_components);
2153         if (r < 0)
2154                 return r;
2155
2156         bus->match_callbacks_modified = true;
2157         r = bus_match_remove(&bus->match_callbacks, components, n_components, callback, userdata, &cookie);
2158
2159         if (bus->bus_client)
2160                 q = bus_remove_match_internal(bus, match, cookie);
2161
2162         bus_match_parse_free(components, n_components);
2163
2164         return r < 0 ? r : q;
2165 }
2166
2167 bool bus_pid_changed(sd_bus *bus) {
2168         assert(bus);
2169
2170         /* We don't support people creating a bus connection and
2171          * keeping it around over a fork(). Let's complain. */
2172
2173         return bus->original_pid != getpid();
2174 }
2175
2176 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
2177         void *bus = userdata;
2178         int r;
2179
2180         assert(bus);
2181
2182         r = sd_bus_process(bus, NULL);
2183         if (r < 0)
2184                 return r;
2185
2186         return 1;
2187 }
2188
2189 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
2190         void *bus = userdata;
2191         int r;
2192
2193         assert(bus);
2194
2195         r = sd_bus_process(bus, NULL);
2196         if (r < 0)
2197                 return r;
2198
2199         return 1;
2200 }
2201
2202 static int prepare_callback(sd_event_source *s, void *userdata) {
2203         sd_bus *bus = userdata;
2204         int r, e;
2205         usec_t until;
2206
2207         assert(s);
2208         assert(bus);
2209
2210         e = sd_bus_get_events(bus);
2211         if (e < 0)
2212                 return e;
2213
2214         if (bus->output_fd != bus->input_fd) {
2215
2216                 r = sd_event_source_set_io_events(bus->input_io_event_source, e & POLLIN);
2217                 if (r < 0)
2218                         return r;
2219
2220                 r = sd_event_source_set_io_events(bus->output_io_event_source, e & POLLOUT);
2221                 if (r < 0)
2222                         return r;
2223         } else {
2224                 r = sd_event_source_set_io_events(bus->input_io_event_source, e);
2225                 if (r < 0)
2226                         return r;
2227         }
2228
2229         r = sd_bus_get_timeout(bus, &until);
2230         if (r < 0)
2231                 return r;
2232         if (r > 0) {
2233                 int j;
2234
2235                 j = sd_event_source_set_time(bus->time_event_source, until);
2236                 if (j < 0)
2237                         return j;
2238         }
2239
2240         r = sd_event_source_set_enabled(bus->time_event_source, r > 0);
2241         if (r < 0)
2242                 return r;
2243
2244         return 1;
2245 }
2246
2247 static int quit_callback(sd_event_source *event, void *userdata) {
2248         sd_bus *bus = userdata;
2249
2250         assert(event);
2251
2252         sd_bus_flush(bus);
2253
2254         return 1;
2255 }
2256
2257 int sd_bus_attach_event(sd_bus *bus, sd_event *event, int priority) {
2258         int r;
2259
2260         assert_return(bus, -EINVAL);
2261         assert_return(event, -EINVAL);
2262         assert_return(!bus->event, -EBUSY);
2263
2264         assert(!bus->input_io_event_source);
2265         assert(!bus->output_io_event_source);
2266         assert(!bus->time_event_source);
2267
2268         bus->event = sd_event_ref(event);
2269
2270         r = sd_event_add_io(event, bus->input_fd, 0, io_callback, bus, &bus->input_io_event_source);
2271         if (r < 0)
2272                 goto fail;
2273
2274         r = sd_event_source_set_priority(bus->input_io_event_source, priority);
2275         if (r < 0)
2276                 goto fail;
2277
2278         if (bus->output_fd != bus->input_fd) {
2279                 r = sd_event_add_io(event, bus->output_fd, 0, io_callback, bus, &bus->output_io_event_source);
2280                 if (r < 0)
2281                         goto fail;
2282
2283                 r = sd_event_source_set_priority(bus->output_io_event_source, priority);
2284                 if (r < 0)
2285                         goto fail;
2286         }
2287
2288         r = sd_event_source_set_prepare(bus->input_io_event_source, prepare_callback);
2289         if (r < 0)
2290                 goto fail;
2291
2292         r = sd_event_add_monotonic(event, 0, 0, time_callback, bus, &bus->time_event_source);
2293         if (r < 0)
2294                 goto fail;
2295
2296         r = sd_event_source_set_priority(bus->time_event_source, priority);
2297         if (r < 0)
2298                 goto fail;
2299
2300         r = sd_event_add_quit(event, quit_callback, bus, &bus->quit_event_source);
2301         if (r < 0)
2302                 goto fail;
2303
2304         return 0;
2305
2306 fail:
2307         sd_bus_detach_event(bus);
2308         return r;
2309 }
2310
2311 int sd_bus_detach_event(sd_bus *bus) {
2312         assert_return(bus, -EINVAL);
2313         assert_return(bus->event, -ENXIO);
2314
2315         if (bus->input_io_event_source)
2316                 bus->input_io_event_source = sd_event_source_unref(bus->input_io_event_source);
2317
2318         if (bus->output_io_event_source)
2319                 bus->output_io_event_source = sd_event_source_unref(bus->output_io_event_source);
2320
2321         if (bus->time_event_source)
2322                 bus->time_event_source = sd_event_source_unref(bus->time_event_source);
2323
2324         if (bus->quit_event_source)
2325                 bus->quit_event_source = sd_event_source_unref(bus->quit_event_source);
2326
2327         if (bus->event)
2328                 bus->event = sd_event_unref(bus->event);
2329
2330         return 0;
2331 }