chiark / gitweb /
bus: implement server mode, and anonymous authentication
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "set.h"
34
35 #include "sd-bus.h"
36 #include "bus-internal.h"
37 #include "bus-message.h"
38 #include "bus-type.h"
39 #include "bus-socket.h"
40
41 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
42
43 static void bus_free(sd_bus *b) {
44         struct filter_callback *f;
45         struct object_callback *c;
46         unsigned i;
47
48         assert(b);
49
50         if (b->fd >= 0)
51                 close_nointr_nofail(b->fd);
52
53         free(b->rbuffer);
54         free(b->unique_name);
55         free(b->auth_buffer);
56         free(b->address);
57
58         free(b->exec_path);
59         strv_free(b->exec_argv);
60
61         close_many(b->fds, b->n_fds);
62         free(b->fds);
63
64         for (i = 0; i < b->rqueue_size; i++)
65                 sd_bus_message_unref(b->rqueue[i]);
66         free(b->rqueue);
67
68         for (i = 0; i < b->wqueue_size; i++)
69                 sd_bus_message_unref(b->wqueue[i]);
70         free(b->wqueue);
71
72         hashmap_free_free(b->reply_callbacks);
73         prioq_free(b->reply_callbacks_prioq);
74
75         while ((f = b->filter_callbacks)) {
76                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
77                 free(f);
78         }
79
80         while ((c = hashmap_steal_first(b->object_callbacks))) {
81                 free(c->path);
82                 free(c);
83         }
84
85         hashmap_free(b->object_callbacks);
86
87         free(b);
88 }
89
90 int sd_bus_new(sd_bus **ret) {
91         sd_bus *r;
92
93         if (!ret)
94                 return -EINVAL;
95
96         r = new0(sd_bus, 1);
97         if (!r)
98                 return -ENOMEM;
99
100         r->n_ref = 1;
101         r->fd = -1;
102         r->message_version = 1;
103         r->negotiate_fds = true;
104
105         /* We guarantee that wqueue always has space for at least one
106          * entry */
107         r->wqueue = new(sd_bus_message*, 1);
108         if (!r->wqueue) {
109                 free(r);
110                 return -ENOMEM;
111         }
112
113         *ret = r;
114         return 0;
115 }
116
117 int sd_bus_set_address(sd_bus *bus, const char *address) {
118         char *a;
119
120         if (!bus)
121                 return -EINVAL;
122         if (bus->state != BUS_UNSET)
123                 return -EPERM;
124         if (!address)
125                 return -EINVAL;
126
127         a = strdup(address);
128         if (!a)
129                 return -ENOMEM;
130
131         free(bus->address);
132         bus->address = a;
133
134         return 0;
135 }
136
137 int sd_bus_set_fd(sd_bus *bus, int fd) {
138         if (!bus)
139                 return -EINVAL;
140         if (bus->state != BUS_UNSET)
141                 return -EPERM;
142         if (fd < 0)
143                 return -EINVAL;
144
145         bus->fd = fd;
146         return 0;
147 }
148
149 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
150         char *p, **a;
151
152         if (!bus)
153                 return -EINVAL;
154         if (bus->state != BUS_UNSET)
155                 return -EPERM;
156         if (!path)
157                 return -EINVAL;
158         if (strv_isempty(argv))
159                 return -EINVAL;
160
161         p = strdup(path);
162         if (!p)
163                 return -ENOMEM;
164
165         a = strv_copy(argv);
166         if (!a) {
167                 free(p);
168                 return -ENOMEM;
169         }
170
171         free(bus->exec_path);
172         strv_free(bus->exec_argv);
173
174         bus->exec_path = p;
175         bus->exec_argv = a;
176
177         return 0;
178 }
179
180 int sd_bus_set_bus_client(sd_bus *bus, int b) {
181         if (!bus)
182                 return -EINVAL;
183         if (bus->state != BUS_UNSET)
184                 return -EPERM;
185
186         bus->bus_client = !!b;
187         return 0;
188 }
189
190 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
191         if (!bus)
192                 return -EINVAL;
193         if (bus->state != BUS_UNSET)
194                 return -EPERM;
195
196         bus->negotiate_fds = !!b;
197         return 0;
198 }
199
200 int sd_bus_set_server(sd_bus *bus, int b, sd_id128_t server) {
201         if (!bus)
202                 return -EINVAL;
203         if (!b && !sd_id128_equal(server, SD_ID128_NULL))
204                 return -EINVAL;
205         if (bus->state != BUS_UNSET)
206                 return -EPERM;
207
208         bus->is_server = !!b;
209         bus->peer = server;
210         return 0;
211 }
212
213 int sd_bus_set_anonymous(sd_bus *bus, int b) {
214         if (!bus)
215                 return -EINVAL;
216         if (bus->state != BUS_UNSET)
217                 return -EPERM;
218
219         bus->anonymous_auth = !!b;
220         return 0;
221 }
222
223 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
224         const char *s;
225         int r;
226
227         assert(bus);
228         assert(bus->state == BUS_HELLO);
229
230         if (error != 0)
231                 return -error;
232
233         assert(reply);
234
235         r = sd_bus_message_read(reply, "s", &s);
236         if (r < 0)
237                 return r;
238
239         if (!service_name_is_valid(s) || s[0] != ':')
240                 return -EBADMSG;
241
242         bus->unique_name = strdup(s);
243         if (!bus->unique_name)
244                 return -ENOMEM;
245
246         bus->state = BUS_RUNNING;
247
248         return 1;
249 }
250
251 static int bus_send_hello(sd_bus *bus) {
252         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
253         int r;
254
255         assert(bus);
256
257         if (!bus->bus_client)
258                 return 0;
259
260         r = sd_bus_message_new_method_call(
261                         bus,
262                         "org.freedesktop.DBus",
263                         "/",
264                         "org.freedesktop.DBus",
265                         "Hello",
266                         &m);
267         if (r < 0)
268                 return r;
269
270         return sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, &bus->hello_serial);
271 }
272
273 int bus_start_running(sd_bus *bus) {
274         assert(bus);
275
276         if (bus->bus_client) {
277                 bus->state = BUS_HELLO;
278                 return 1;
279         }
280
281         bus->state = BUS_RUNNING;
282         return 1;
283 }
284
285 static int parse_address_key(const char **p, const char *key, char **value) {
286         size_t l, n = 0;
287         const char *a;
288         char *r = NULL;
289
290         assert(p);
291         assert(*p);
292         assert(value);
293
294         if (key) {
295                 l = strlen(key);
296                 if (strncmp(*p, key, l) != 0)
297                         return 0;
298
299                 if ((*p)[l] != '=')
300                         return 0;
301
302                 if (*value)
303                         return -EINVAL;
304
305                 a = *p + l + 1;
306         } else
307                 a = *p;
308
309         while (*a != ';' && *a != ',' && *a != 0) {
310                 char c, *t;
311
312                 if (*a == '%') {
313                         int x, y;
314
315                         x = unhexchar(a[1]);
316                         if (x < 0) {
317                                 free(r);
318                                 return x;
319                         }
320
321                         y = unhexchar(a[2]);
322                         if (y < 0) {
323                                 free(r);
324                                 return y;
325                         }
326
327                         c = (char) ((x << 4) | y);
328                         a += 3;
329                 } else {
330                         c = *a;
331                         a++;
332                 }
333
334                 t = realloc(r, n + 2);
335                 if (!t) {
336                         free(r);
337                         return -ENOMEM;
338                 }
339
340                 r = t;
341                 r[n++] = c;
342         }
343
344         if (!r) {
345                 r = strdup("");
346                 if (!r)
347                         return -ENOMEM;
348         } else
349                 r[n] = 0;
350
351         if (*a == ',')
352                 a++;
353
354         *p = a;
355
356         free(*value);
357         *value = r;
358
359         return 1;
360 }
361
362 static void skip_address_key(const char **p) {
363         assert(p);
364         assert(*p);
365
366         *p += strcspn(*p, ",");
367
368         if (**p == ',')
369                 (*p) ++;
370 }
371
372 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
373         _cleanup_free_ char *path = NULL, *abstract = NULL;
374         size_t l;
375         int r;
376
377         assert(b);
378         assert(p);
379         assert(*p);
380         assert(guid);
381
382         while (**p != 0 && **p != ';') {
383                 r = parse_address_key(p, "guid", guid);
384                 if (r < 0)
385                         return r;
386                 else if (r > 0)
387                         continue;
388
389                 r = parse_address_key(p, "path", &path);
390                 if (r < 0)
391                         return r;
392                 else if (r > 0)
393                         continue;
394
395                 r = parse_address_key(p, "abstract", &abstract);
396                 if (r < 0)
397                         return r;
398                 else if (r > 0)
399                         continue;
400
401                 skip_address_key(p);
402         }
403
404         if (!path && !abstract)
405                 return -EINVAL;
406
407         if (path && abstract)
408                 return -EINVAL;
409
410         if (path) {
411                 l = strlen(path);
412                 if (l > sizeof(b->sockaddr.un.sun_path))
413                         return -E2BIG;
414
415                 b->sockaddr.un.sun_family = AF_UNIX;
416                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
417                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
418         } else if (abstract) {
419                 l = strlen(abstract);
420                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
421                         return -E2BIG;
422
423                 b->sockaddr.un.sun_family = AF_UNIX;
424                 b->sockaddr.un.sun_path[0] = 0;
425                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
426                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
427         }
428
429         return 0;
430 }
431
432 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
433         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
434         struct addrinfo hints, *result;
435         int r;
436
437         assert(b);
438         assert(p);
439         assert(*p);
440         assert(guid);
441
442         while (**p != 0 && **p != ';') {
443                 r = parse_address_key(p, "guid", guid);
444                 if (r < 0)
445                         return r;
446                 else if (r > 0)
447                         continue;
448
449                 r = parse_address_key(p, "host", &host);
450                 if (r < 0)
451                         return r;
452                 else if (r > 0)
453                         continue;
454
455                 r = parse_address_key(p, "port", &port);
456                 if (r < 0)
457                         return r;
458                 else if (r > 0)
459                         continue;
460
461                 r = parse_address_key(p, "family", &family);
462                 if (r < 0)
463                         return r;
464                 else if (r > 0)
465                         continue;
466
467                 skip_address_key(p);
468         }
469
470         if (!host || !port)
471                 return -EINVAL;
472
473         zero(hints);
474         hints.ai_socktype = SOCK_STREAM;
475         hints.ai_flags = AI_ADDRCONFIG;
476
477         if (family) {
478                 if (streq(family, "ipv4"))
479                         hints.ai_family = AF_INET;
480                 else if (streq(family, "ipv6"))
481                         hints.ai_family = AF_INET6;
482                 else
483                         return -EINVAL;
484         }
485
486         r = getaddrinfo(host, port, &hints, &result);
487         if (r == EAI_SYSTEM)
488                 return -errno;
489         else if (r != 0)
490                 return -EADDRNOTAVAIL;
491
492         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
493         b->sockaddr_size = result->ai_addrlen;
494
495         freeaddrinfo(result);
496
497         return 0;
498 }
499
500 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
501         char *path = NULL;
502         unsigned n_argv = 0, j;
503         char **argv = NULL;
504         int r;
505
506         assert(b);
507         assert(p);
508         assert(*p);
509         assert(guid);
510
511         while (**p != 0 && **p != ';') {
512                 r = parse_address_key(p, "guid", guid);
513                 if (r < 0)
514                         goto fail;
515                 else if (r > 0)
516                         continue;
517
518                 r = parse_address_key(p, "path", &path);
519                 if (r < 0)
520                         goto fail;
521                 else if (r > 0)
522                         continue;
523
524                 if (startswith(*p, "argv")) {
525                         unsigned ul;
526
527                         errno = 0;
528                         ul = strtoul(*p + 4, (char**) p, 10);
529                         if (errno > 0 || **p != '=' || ul > 256) {
530                                 r = -EINVAL;
531                                 goto fail;
532                         }
533
534                         (*p) ++;
535
536                         if (ul >= n_argv) {
537                                 char **x;
538
539                                 x = realloc(argv, sizeof(char*) * (ul + 2));
540                                 if (!x) {
541                                         r = -ENOMEM;
542                                         goto fail;
543                                 }
544
545                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
546
547                                 argv = x;
548                                 n_argv = ul + 1;
549                         }
550
551                         r = parse_address_key(p, NULL, argv + ul);
552                         if (r < 0)
553                                 goto fail;
554
555                         continue;
556                 }
557
558                 skip_address_key(p);
559         }
560
561         if (!path) {
562                 r = -EINVAL;
563                 goto fail;
564         }
565
566         /* Make sure there are no holes in the array, with the
567          * exception of argv[0] */
568         for (j = 1; j < n_argv; j++)
569                 if (!argv[j]) {
570                         r = -EINVAL;
571                         goto fail;
572                 }
573
574         if (argv && argv[0] == NULL) {
575                 argv[0] = strdup(path);
576                 if (!argv[0]) {
577                         r = -ENOMEM;
578                         goto fail;
579                 }
580         }
581
582         b->exec_path = path;
583         b->exec_argv = argv;
584         return 0;
585
586 fail:
587         for (j = 0; j < n_argv; j++)
588                 free(argv[j]);
589
590         free(argv);
591         free(path);
592         return r;
593 }
594
595 static void bus_reset_parsed_address(sd_bus *b) {
596         assert(b);
597
598         zero(b->sockaddr);
599         b->sockaddr_size = 0;
600         strv_free(b->exec_argv);
601         free(b->exec_path);
602         b->exec_path = NULL;
603         b->exec_argv = NULL;
604         b->peer = SD_ID128_NULL;
605 }
606
607 static int bus_parse_next_address(sd_bus *b) {
608         _cleanup_free_ char *guid = NULL;
609         const char *a;
610         int r;
611
612         assert(b);
613
614         if (!b->address)
615                 return 0;
616         if (b->address[b->address_index] == 0)
617                 return 0;
618
619         bus_reset_parsed_address(b);
620
621         a = b->address + b->address_index;
622
623         while (*a != 0) {
624
625                 if (*a == ';') {
626                         a++;
627                         continue;
628                 }
629
630                 if (startswith(a, "unix:")) {
631                         a += 5;
632
633                         r = parse_unix_address(b, &a, &guid);
634                         if (r < 0)
635                                 return r;
636                         break;
637
638                 } else if (startswith(a, "tcp:")) {
639
640                         a += 4;
641                         r = parse_tcp_address(b, &a, &guid);
642                         if (r < 0)
643                                 return r;
644
645                         break;
646
647                 } else if (startswith(a, "unixexec:")) {
648
649                         a += 9;
650                         r = parse_exec_address(b, &a, &guid);
651                         if (r < 0)
652                                 return r;
653
654                         break;
655
656                 }
657
658                 a = strchr(a, ';');
659                 if (!a)
660                         return 0;
661         }
662
663         if (guid) {
664                 r = sd_id128_from_string(guid, &b->peer);
665                 if (r < 0)
666                         return r;
667         }
668
669         b->address_index = a - b->address;
670         return 1;
671 }
672
673 static int bus_start_address(sd_bus *b) {
674         int r;
675
676         assert(b);
677
678         for (;;) {
679                 if (b->fd >= 0) {
680                         close_nointr_nofail(b->fd);
681                         b->fd = -1;
682                 }
683
684                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
685
686                         r = bus_socket_connect(b);
687                         if (r >= 0)
688                                 return r;
689
690                         b->last_connect_error = -r;
691
692                 } else if (b->exec_path) {
693
694                         r = bus_socket_exec(b);
695                         if (r >= 0)
696                                 return r;
697
698                         b->last_connect_error = -r;
699                 }
700
701                 r = bus_parse_next_address(b);
702                 if (r < 0)
703                         return r;
704                 if (r == 0)
705                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
706         }
707 }
708
709 int bus_next_address(sd_bus *b) {
710         assert(b);
711
712         bus_reset_parsed_address(b);
713         return bus_start_address(b);
714 }
715
716 static int bus_start_fd(sd_bus *b) {
717         int r;
718
719         assert(b);
720
721         r = fd_nonblock(b->fd, true);
722         if (r < 0)
723                 return r;
724
725         r = fd_cloexec(b->fd, true);
726         if (r < 0)
727                 return r;
728
729         return bus_socket_take_fd(b);
730 }
731
732 int sd_bus_start(sd_bus *bus) {
733         int r;
734
735         if (!bus)
736                 return -EINVAL;
737         if (bus->state != BUS_UNSET)
738                 return -EPERM;
739
740         bus->state = BUS_OPENING;
741
742         if (bus->is_server && bus->bus_client)
743                 return -EINVAL;
744
745         if (bus->fd >= 0)
746                 r = bus_start_fd(bus);
747         else if (bus->address || bus->sockaddr.sa.sa_family != AF_UNSPEC || bus->exec_path)
748                 r = bus_start_address(bus);
749         else
750                 return -EINVAL;
751
752         if (r < 0)
753                 return r;
754
755         return bus_send_hello(bus);
756 }
757
758 int sd_bus_open_system(sd_bus **ret) {
759         const char *e;
760         sd_bus *b;
761         int r;
762
763         if (!ret)
764                 return -EINVAL;
765
766         r = sd_bus_new(&b);
767         if (r < 0)
768                 return r;
769
770         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
771         if (e) {
772                 r = sd_bus_set_address(b, e);
773                 if (r < 0)
774                         goto fail;
775         } else {
776                 b->sockaddr.un.sun_family = AF_UNIX;
777                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
778                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
779         }
780
781         b->bus_client = true;
782
783         r = sd_bus_start(b);
784         if (r < 0)
785                 goto fail;
786
787         *ret = b;
788         return 0;
789
790 fail:
791         bus_free(b);
792         return r;
793 }
794
795 int sd_bus_open_user(sd_bus **ret) {
796         const char *e;
797         sd_bus *b;
798         size_t l;
799         int r;
800
801         if (!ret)
802                 return -EINVAL;
803
804         r = sd_bus_new(&b);
805         if (r < 0)
806                 return r;
807
808         e = getenv("DBUS_SESSION_BUS_ADDRESS");
809         if (e) {
810                 r = sd_bus_set_address(b, e);
811                 if (r < 0)
812                         goto fail;
813         } else {
814                 e = getenv("XDG_RUNTIME_DIR");
815                 if (!e) {
816                         r = -ENOENT;
817                         goto fail;
818                 }
819
820                 l = strlen(e);
821                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
822                         r = -E2BIG;
823                         goto fail;
824                 }
825
826                 b->sockaddr.un.sun_family = AF_UNIX;
827                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
828                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
829         }
830
831         b->bus_client = true;
832
833         r = sd_bus_start(b);
834         if (r < 0)
835                 goto fail;
836
837         *ret = b;
838         return 0;
839
840 fail:
841         bus_free(b);
842         return r;
843 }
844
845 void sd_bus_close(sd_bus *bus) {
846         if (!bus)
847                 return;
848         if (bus->fd < 0)
849                 return;
850
851         close_nointr_nofail(bus->fd);
852         bus->fd = -1;
853 }
854
855 sd_bus *sd_bus_ref(sd_bus *bus) {
856         if (!bus)
857                 return NULL;
858
859         assert(bus->n_ref > 0);
860
861         bus->n_ref++;
862         return bus;
863 }
864
865 sd_bus *sd_bus_unref(sd_bus *bus) {
866         if (!bus)
867                 return NULL;
868
869         assert(bus->n_ref > 0);
870         bus->n_ref--;
871
872         if (bus->n_ref <= 0)
873                 bus_free(bus);
874
875         return NULL;
876 }
877
878 int sd_bus_is_open(sd_bus *bus) {
879         if (!bus)
880                 return -EINVAL;
881
882         return bus->state != BUS_UNSET && bus->fd >= 0;
883 }
884
885 int sd_bus_can_send(sd_bus *bus, char type) {
886         int r;
887
888         if (!bus)
889                 return -EINVAL;
890         if (bus->fd < 0)
891                 return -ENOTCONN;
892
893         if (type == SD_BUS_TYPE_UNIX_FD) {
894                 if (!bus->negotiate_fds)
895                         return 0;
896
897                 r = bus_ensure_running(bus);
898                 if (r < 0)
899                         return r;
900
901                 return bus->can_fds;
902         }
903
904         return bus_type_is_valid(type);
905 }
906
907 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
908         int r;
909
910         if (!bus)
911                 return -EINVAL;
912         if (!peer)
913                 return -EINVAL;
914
915         r = bus_ensure_running(bus);
916         if (r < 0)
917                 return r;
918
919         *peer = bus->peer;
920         return 0;
921 }
922
923 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
924         assert(m);
925
926         if (m->header->version > b->message_version)
927                 return -EPERM;
928
929         if (m->sealed)
930                 return 0;
931
932         return bus_message_seal(m, ++b->serial);
933 }
934
935 static int dispatch_wqueue(sd_bus *bus) {
936         int r, ret = 0;
937
938         assert(bus);
939         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
940
941         if (bus->fd < 0)
942                 return -ENOTCONN;
943
944         while (bus->wqueue_size > 0) {
945
946                 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
947                 if (r < 0) {
948                         sd_bus_close(bus);
949                         return r;
950                 } else if (r == 0)
951                         /* Didn't do anything this time */
952                         return ret;
953                 else if (bus->windex >= bus->wqueue[0]->size) {
954                         /* Fully written. Let's drop the entry from
955                          * the queue.
956                          *
957                          * This isn't particularly optimized, but
958                          * well, this is supposed to be our worst-case
959                          * buffer only, and the socket buffer is
960                          * supposed to be our primary buffer, and if
961                          * it got full, then all bets are off
962                          * anyway. */
963
964                         sd_bus_message_unref(bus->wqueue[0]);
965                         bus->wqueue_size --;
966                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
967                         bus->windex = 0;
968
969                         ret = 1;
970                 }
971         }
972
973         return ret;
974 }
975
976 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
977         sd_bus_message *z = NULL;
978         int r, ret = 0;
979
980         assert(bus);
981         assert(m);
982         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
983
984         if (bus->fd < 0)
985                 return -ENOTCONN;
986
987         if (bus->rqueue_size > 0) {
988                 /* Dispatch a queued message */
989
990                 *m = bus->rqueue[0];
991                 bus->rqueue_size --;
992                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
993                 return 1;
994         }
995
996         /* Try to read a new message */
997         do {
998                 r = bus_socket_read_message(bus, &z);
999                 if (r < 0) {
1000                         sd_bus_close(bus);
1001                         return r;
1002                 }
1003                 if (r == 0)
1004                         return ret;
1005
1006                 r = 1;
1007         } while (!z);
1008
1009         *m = z;
1010         return 1;
1011 }
1012
1013 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1014         int r;
1015
1016         if (!bus)
1017                 return -EINVAL;
1018         if (bus->state == BUS_UNSET)
1019                 return -ENOTCONN;
1020         if (bus->fd < 0)
1021                 return -ENOTCONN;
1022         if (!m)
1023                 return -EINVAL;
1024
1025         if (m->n_fds > 0) {
1026                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1027                 if (r < 0)
1028                         return r;
1029                 if (r == 0)
1030                         return -ENOTSUP;
1031         }
1032
1033         /* If the serial number isn't kept, then we know that no reply
1034          * is expected */
1035         if (!serial && !m->sealed)
1036                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1037
1038         r = bus_seal_message(bus, m);
1039         if (r < 0)
1040                 return r;
1041
1042         /* If this is a reply and no reply was requested, then let's
1043          * suppress this, if we can */
1044         if (m->dont_send && !serial)
1045                 return 0;
1046
1047         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1048                 size_t idx = 0;
1049
1050                 r = bus_socket_write_message(bus, m, &idx);
1051                 if (r < 0) {
1052                         sd_bus_close(bus);
1053                         return r;
1054                 } else if (idx < m->size)  {
1055                         /* Wasn't fully written. So let's remember how
1056                          * much was written. Note that the first entry
1057                          * of the wqueue array is always allocated so
1058                          * that we always can remember how much was
1059                          * written. */
1060                         bus->wqueue[0] = sd_bus_message_ref(m);
1061                         bus->wqueue_size = 1;
1062                         bus->windex = idx;
1063                 }
1064         } else {
1065                 sd_bus_message **q;
1066
1067                 /* Just append it to the queue. */
1068
1069                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1070                         return -ENOBUFS;
1071
1072                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1073                 if (!q)
1074                         return -ENOMEM;
1075
1076                 bus->wqueue = q;
1077                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1078         }
1079
1080         if (serial)
1081                 *serial = BUS_MESSAGE_SERIAL(m);
1082
1083         return 0;
1084 }
1085
1086 static usec_t calc_elapse(uint64_t usec) {
1087         if (usec == (uint64_t) -1)
1088                 return 0;
1089
1090         if (usec == 0)
1091                 usec = BUS_DEFAULT_TIMEOUT;
1092
1093         return now(CLOCK_MONOTONIC) + usec;
1094 }
1095
1096 static int timeout_compare(const void *a, const void *b) {
1097         const struct reply_callback *x = a, *y = b;
1098
1099         if (x->timeout != 0 && y->timeout == 0)
1100                 return -1;
1101
1102         if (x->timeout == 0 && y->timeout != 0)
1103                 return 1;
1104
1105         if (x->timeout < y->timeout)
1106                 return -1;
1107
1108         if (x->timeout > y->timeout)
1109                 return 1;
1110
1111         return 0;
1112 }
1113
1114 int sd_bus_send_with_reply(
1115                 sd_bus *bus,
1116                 sd_bus_message *m,
1117                 sd_message_handler_t callback,
1118                 void *userdata,
1119                 uint64_t usec,
1120                 uint64_t *serial) {
1121
1122         struct reply_callback *c;
1123         int r;
1124
1125         if (!bus)
1126                 return -EINVAL;
1127         if (bus->state == BUS_UNSET)
1128                 return -ENOTCONN;
1129         if (bus->fd < 0)
1130                 return -ENOTCONN;
1131         if (!m)
1132                 return -EINVAL;
1133         if (!callback)
1134                 return -EINVAL;
1135         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1136                 return -EINVAL;
1137         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1138                 return -EINVAL;
1139
1140         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1141         if (r < 0)
1142                 return r;
1143
1144         if (usec != (uint64_t) -1) {
1145                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1146                 if (r < 0)
1147                         return r;
1148         }
1149
1150         r = bus_seal_message(bus, m);
1151         if (r < 0)
1152                 return r;
1153
1154         c = new(struct reply_callback, 1);
1155         if (!c)
1156                 return -ENOMEM;
1157
1158         c->callback = callback;
1159         c->userdata = userdata;
1160         c->serial = BUS_MESSAGE_SERIAL(m);
1161         c->timeout = calc_elapse(usec);
1162
1163         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1164         if (r < 0) {
1165                 free(c);
1166                 return r;
1167         }
1168
1169         if (c->timeout != 0) {
1170                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1171                 if (r < 0) {
1172                         c->timeout = 0;
1173                         sd_bus_send_with_reply_cancel(bus, c->serial);
1174                         return r;
1175                 }
1176         }
1177
1178         r = sd_bus_send(bus, m, serial);
1179         if (r < 0) {
1180                 sd_bus_send_with_reply_cancel(bus, c->serial);
1181                 return r;
1182         }
1183
1184         return r;
1185 }
1186
1187 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1188         struct reply_callback *c;
1189
1190         if (!bus)
1191                 return -EINVAL;
1192         if (serial == 0)
1193                 return -EINVAL;
1194
1195         c = hashmap_remove(bus->reply_callbacks, &serial);
1196         if (!c)
1197                 return 0;
1198
1199         if (c->timeout != 0)
1200                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1201
1202         free(c);
1203         return 1;
1204 }
1205
1206 int bus_ensure_running(sd_bus *bus) {
1207         int r;
1208
1209         assert(bus);
1210
1211         if (bus->fd < 0)
1212                 return -ENOTCONN;
1213         if (bus->state == BUS_UNSET)
1214                 return -ENOTCONN;
1215
1216         if (bus->state == BUS_RUNNING)
1217                 return 1;
1218
1219         for (;;) {
1220                 r = sd_bus_process(bus, NULL);
1221                 if (r < 0)
1222                         return r;
1223                 if (bus->state == BUS_RUNNING)
1224                         return 1;
1225                 if (r > 0)
1226                         continue;
1227
1228                 r = sd_bus_wait(bus, (uint64_t) -1);
1229                 if (r < 0)
1230                         return r;
1231         }
1232 }
1233
1234 int sd_bus_send_with_reply_and_block(
1235                 sd_bus *bus,
1236                 sd_bus_message *m,
1237                 uint64_t usec,
1238                 sd_bus_error *error,
1239                 sd_bus_message **reply) {
1240
1241         int r;
1242         usec_t timeout;
1243         uint64_t serial;
1244         bool room = false;
1245
1246         if (!bus)
1247                 return -EINVAL;
1248         if (bus->fd < 0)
1249                 return -ENOTCONN;
1250         if (bus->state == BUS_UNSET)
1251                 return -ENOTCONN;
1252         if (!m)
1253                 return -EINVAL;
1254         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1255                 return -EINVAL;
1256         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1257                 return -EINVAL;
1258         if (bus_error_is_dirty(error))
1259                 return -EINVAL;
1260
1261         r = bus_ensure_running(bus);
1262         if (r < 0)
1263                 return r;
1264
1265         r = sd_bus_send(bus, m, &serial);
1266         if (r < 0)
1267                 return r;
1268
1269         timeout = calc_elapse(usec);
1270
1271         for (;;) {
1272                 usec_t left;
1273                 sd_bus_message *incoming = NULL;
1274
1275                 if (!room) {
1276                         sd_bus_message **q;
1277
1278                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1279                                 return -ENOBUFS;
1280
1281                         /* Make sure there's room for queuing this
1282                          * locally, before we read the message */
1283
1284                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1285                         if (!q)
1286                                 return -ENOMEM;
1287
1288                         bus->rqueue = q;
1289                         room = true;
1290                 }
1291
1292                 r = bus_socket_read_message(bus, &incoming);
1293                 if (r < 0)
1294                         return r;
1295                 if (incoming) {
1296
1297                         if (incoming->reply_serial == serial) {
1298                                 /* Found a match! */
1299
1300                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1301                                         *reply = incoming;
1302                                         return 0;
1303                                 }
1304
1305                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1306                                         int k;
1307
1308                                         r = sd_bus_error_copy(error, &incoming->error);
1309                                         if (r < 0) {
1310                                                 sd_bus_message_unref(incoming);
1311                                                 return r;
1312                                         }
1313
1314                                         k = bus_error_to_errno(&incoming->error);
1315                                         sd_bus_message_unref(incoming);
1316                                         return k;
1317                                 }
1318
1319                                 sd_bus_message_unref(incoming);
1320                                 return -EIO;
1321                         }
1322
1323                         /* There's already guaranteed to be room for
1324                          * this, so need to resize things here */
1325                         bus->rqueue[bus->rqueue_size ++] = incoming;
1326                         room = false;
1327
1328                         /* Try to read more, right-away */
1329                         continue;
1330                 }
1331                 if (r != 0)
1332                         continue;
1333
1334                 if (timeout > 0) {
1335                         usec_t n;
1336
1337                         n = now(CLOCK_MONOTONIC);
1338                         if (n >= timeout)
1339                                 return -ETIMEDOUT;
1340
1341                         left = timeout - n;
1342                 } else
1343                         left = (uint64_t) -1;
1344
1345                 r = bus_poll(bus, true, left);
1346                 if (r < 0)
1347                         return r;
1348
1349                 r = dispatch_wqueue(bus);
1350                 if (r < 0)
1351                         return r;
1352         }
1353 }
1354
1355 int sd_bus_get_fd(sd_bus *bus) {
1356         if (!bus)
1357                 return -EINVAL;
1358
1359         if (bus->fd < 0)
1360                 return -ENOTCONN;
1361
1362         return bus->fd;
1363 }
1364
1365 int sd_bus_get_events(sd_bus *bus) {
1366         int flags = 0;
1367
1368         if (!bus)
1369                 return -EINVAL;
1370         if (bus->state == BUS_UNSET)
1371                 return -ENOTCONN;
1372         if (bus->fd < 0)
1373                 return -ENOTCONN;
1374
1375         if (bus->state == BUS_OPENING)
1376                 flags |= POLLOUT;
1377         else if (bus->state == BUS_AUTHENTICATING) {
1378
1379                 if (bus_socket_auth_needs_write(bus))
1380                         flags |= POLLOUT;
1381
1382                 flags |= POLLIN;
1383
1384         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1385                 if (bus->rqueue_size <= 0)
1386                         flags |= POLLIN;
1387                 if (bus->wqueue_size > 0)
1388                         flags |= POLLOUT;
1389         }
1390
1391         return flags;
1392 }
1393
1394 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1395         struct reply_callback *c;
1396
1397         if (!bus)
1398                 return -EINVAL;
1399         if (!timeout_usec)
1400                 return -EINVAL;
1401         if (bus->state == BUS_UNSET)
1402                 return -ENOTCONN;
1403         if (bus->fd < 0)
1404                 return -ENOTCONN;
1405
1406         if (bus->state == BUS_AUTHENTICATING) {
1407                 *timeout_usec = bus->auth_timeout;
1408                 return 1;
1409         }
1410
1411         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1412                 return 0;
1413
1414         c = prioq_peek(bus->reply_callbacks_prioq);
1415         if (!c)
1416                 return 0;
1417
1418         *timeout_usec = c->timeout;
1419         return 1;
1420 }
1421
1422 static int process_timeout(sd_bus *bus) {
1423         struct reply_callback *c;
1424         usec_t n;
1425         int r;
1426
1427         assert(bus);
1428
1429         c = prioq_peek(bus->reply_callbacks_prioq);
1430         if (!c)
1431                 return 0;
1432
1433         n = now(CLOCK_MONOTONIC);
1434         if (c->timeout > n)
1435                 return 0;
1436
1437         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1438         hashmap_remove(bus->reply_callbacks, &c->serial);
1439
1440         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1441         free(c);
1442
1443         return r < 0 ? r : 1;
1444 }
1445
1446 static int process_hello(sd_bus *bus, sd_bus_message *m) {
1447         assert(bus);
1448         assert(m);
1449
1450         if (bus->state != BUS_HELLO)
1451                 return 0;
1452
1453         /* Let's make sure the first message on the bus is the HELLO
1454          * reply. But note that we don't actually parse the message
1455          * here (we leave that to the usual handling), we just verify
1456          * we don't let any earlier msg through. */
1457
1458         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1459             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1460                 return -EIO;
1461
1462         if (m->reply_serial != bus->hello_serial)
1463                 return -EIO;
1464
1465         return 0;
1466 }
1467
1468 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1469         struct reply_callback *c;
1470         int r;
1471
1472         assert(bus);
1473         assert(m);
1474
1475         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1476             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1477                 return 0;
1478
1479         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1480         if (!c)
1481                 return 0;
1482
1483         if (c->timeout != 0)
1484                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1485
1486         r = c->callback(bus, 0, m, c->userdata);
1487         free(c);
1488
1489         return r;
1490 }
1491
1492 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1493         struct filter_callback *l;
1494         int r;
1495
1496         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1497                 r = l->callback(bus, 0, m, l->userdata);
1498                 if (r != 0)
1499                         return r;
1500         }
1501
1502         return 0;
1503 }
1504
1505 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1506         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1507         int r;
1508
1509         assert(bus);
1510         assert(m);
1511
1512         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1513                 return 0;
1514
1515         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1516                 return 0;
1517
1518         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1519                 return 1;
1520
1521         if (streq_ptr(m->member, "Ping"))
1522                 r = sd_bus_message_new_method_return(bus, m, &reply);
1523         else if (streq_ptr(m->member, "GetMachineId")) {
1524                 sd_id128_t id;
1525                 char sid[33];
1526
1527                 r = sd_id128_get_machine(&id);
1528                 if (r < 0)
1529                         return r;
1530
1531                 r = sd_bus_message_new_method_return(bus, m, &reply);
1532                 if (r < 0)
1533                         return r;
1534
1535                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1536         } else {
1537                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1538
1539                 sd_bus_error_set(&error,
1540                                  "org.freedesktop.DBus.Error.UnknownMethod",
1541                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1542
1543                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1544         }
1545
1546         if (r < 0)
1547                 return r;
1548
1549         r = sd_bus_send(bus, reply, NULL);
1550         if (r < 0)
1551                 return r;
1552
1553         return 1;
1554 }
1555
1556 static int process_object(sd_bus *bus, sd_bus_message *m) {
1557         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1558         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1559         struct object_callback *c;
1560         char *p;
1561         int r;
1562         bool found = false;
1563
1564         assert(bus);
1565         assert(m);
1566
1567         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1568                 return 0;
1569
1570         if (hashmap_isempty(bus->object_callbacks))
1571                 return 0;
1572
1573         c = hashmap_get(bus->object_callbacks, m->path);
1574         if (c) {
1575                 r = c->callback(bus, 0, m, c->userdata);
1576                 if (r != 0)
1577                         return r;
1578
1579                 found = true;
1580         }
1581
1582         /* Look for fallback prefixes */
1583         p = strdupa(m->path);
1584         for (;;) {
1585                 char *e;
1586
1587                 e = strrchr(p, '/');
1588                 if (e == p || !e)
1589                         break;
1590
1591                 *e = 0;
1592
1593                 c = hashmap_get(bus->object_callbacks, p);
1594                 if (c && c->is_fallback) {
1595                         r = c->callback(bus, 0, m, c->userdata);
1596                         if (r != 0)
1597                                 return r;
1598
1599                         found = true;
1600                 }
1601         }
1602
1603         /* We found some handlers but none wanted to take this, then
1604          * return this -- with one exception, we can handle
1605          * introspection minimally ourselves */
1606         if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1607                 return 0;
1608
1609         sd_bus_error_set(&error,
1610                          "org.freedesktop.DBus.Error.UnknownMethod",
1611                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1612
1613         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1614         if (r < 0)
1615                 return r;
1616
1617         r = sd_bus_send(bus, reply, NULL);
1618         if (r < 0)
1619                 return r;
1620
1621         return 1;
1622 }
1623
1624 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1625         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1626         _cleanup_free_ char *introspection = NULL;
1627         _cleanup_set_free_free_ Set *s = NULL;
1628         _cleanup_fclose_ FILE *f = NULL;
1629         struct object_callback *c;
1630         Iterator i;
1631         size_t size = 0;
1632         char *node;
1633         int r;
1634
1635         assert(bus);
1636         assert(m);
1637
1638         if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1639                 return 0;
1640
1641         if (!m->path)
1642                 return 0;
1643
1644         s = set_new(string_hash_func, string_compare_func);
1645         if (!s)
1646                 return -ENOMEM;
1647
1648         HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1649                 const char *e;
1650                 char *a, *p;
1651
1652                 if (streq(c->path, "/"))
1653                         continue;
1654
1655                 if (streq(m->path, "/"))
1656                         e = c->path;
1657                 else {
1658                         e = startswith(c->path, m->path);
1659                         if (!e || *e != '/')
1660                                 continue;
1661                 }
1662
1663                 a = strdup(e+1);
1664                 if (!a)
1665                         return -ENOMEM;
1666
1667                 p = strchr(a, '/');
1668                 if (p)
1669                         *p = 0;
1670
1671                 r = set_put(s, a);
1672                 if (r < 0) {
1673                         free(a);
1674
1675                         if (r != -EEXIST)
1676                                 return r;
1677                 }
1678         }
1679
1680         f = open_memstream(&introspection, &size);
1681         if (!f)
1682                 return -ENOMEM;
1683
1684         fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1685         fputs("<node>\n", f);
1686         fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1687         fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1688
1689         while ((node = set_steal_first(s))) {
1690                 fprintf(f, " <node name=\"%s\"/>\n", node);
1691                 free(node);
1692         }
1693
1694         fputs("</node>\n", f);
1695
1696         fflush(f);
1697
1698         if (ferror(f))
1699                 return -ENOMEM;
1700
1701         r = sd_bus_message_new_method_return(bus, m, &reply);
1702         if (r < 0)
1703                 return r;
1704
1705         r = sd_bus_message_append(reply, "s", introspection);
1706         if (r < 0)
1707                 return r;
1708
1709         r = sd_bus_send(bus, reply, NULL);
1710         if (r < 0)
1711                 return r;
1712
1713         return 1;
1714 }
1715
1716 static int process_message(sd_bus *bus, sd_bus_message *m) {
1717         int r;
1718
1719         assert(bus);
1720         assert(m);
1721
1722         r = process_hello(bus, m);
1723         if (r != 0)
1724                 return r;
1725
1726         r = process_reply(bus, m);
1727         if (r != 0)
1728                 return r;
1729
1730         r = process_filter(bus, m);
1731         if (r != 0)
1732                 return r;
1733
1734         r = process_builtin(bus, m);
1735         if (r != 0)
1736                 return r;
1737
1738         r = process_object(bus, m);
1739         if (r != 0)
1740                 return r;
1741
1742         return process_introspect(bus, m);
1743 }
1744
1745 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1746         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1747         int r;
1748
1749         assert(bus);
1750         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1751
1752         r = process_timeout(bus);
1753         if (r != 0)
1754                 goto null_message;
1755
1756         r = dispatch_wqueue(bus);
1757         if (r != 0)
1758                 goto null_message;
1759
1760         r = dispatch_rqueue(bus, &m);
1761         if (r < 0)
1762                 return r;
1763         if (!m)
1764                 goto null_message;
1765
1766         r = process_message(bus, m);
1767         if (r != 0)
1768                 goto null_message;
1769
1770         if (ret) {
1771                 *ret = m;
1772                 m = NULL;
1773                 return 1;
1774         }
1775
1776         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1777                 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1778                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1779
1780                 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1781
1782                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1783                 if (r < 0)
1784                         return r;
1785
1786                 r = sd_bus_send(bus, reply, NULL);
1787                 if (r < 0)
1788                         return r;
1789         }
1790
1791         return 1;
1792
1793 null_message:
1794         if (r >= 0 && ret)
1795                 *ret = NULL;
1796
1797         return r;
1798 }
1799
1800 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1801         int r;
1802
1803         /* Returns 0 when we didn't do anything. This should cause the
1804          * caller to invoke sd_bus_wait() before returning the next
1805          * time. Returns > 0 when we did something, which possibly
1806          * means *ret is filled in with an unprocessed message. */
1807
1808         if (!bus)
1809                 return -EINVAL;
1810         if (bus->fd < 0)
1811                 return -ENOTCONN;
1812
1813         switch (bus->state) {
1814
1815         case BUS_UNSET:
1816                 return -ENOTCONN;
1817
1818         case BUS_OPENING:
1819                 r = bus_socket_process_opening(bus);
1820                 if (r < 0)
1821                         return r;
1822                 if (ret)
1823                         *ret = NULL;
1824                 return r;
1825
1826         case BUS_AUTHENTICATING:
1827
1828                 r = bus_socket_process_authenticating(bus);
1829                 if (r < 0)
1830                         return r;
1831                 if (ret)
1832                         *ret = NULL;
1833                 return r;
1834
1835         case BUS_RUNNING:
1836         case BUS_HELLO:
1837
1838                 return process_running(bus, ret);
1839         }
1840
1841         assert_not_reached("Unknown state");
1842 }
1843
1844 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1845         struct pollfd p;
1846         int r, e;
1847         struct timespec ts;
1848         usec_t until, m;
1849
1850         assert(bus);
1851
1852         if (bus->fd < 0)
1853                 return -ENOTCONN;
1854
1855         e = sd_bus_get_events(bus);
1856         if (e < 0)
1857                 return e;
1858
1859         if (need_more)
1860                 e |= POLLIN;
1861
1862         r = sd_bus_get_timeout(bus, &until);
1863         if (r < 0)
1864                 return r;
1865         if (r == 0)
1866                 m = (uint64_t) -1;
1867         else {
1868                 usec_t n;
1869                 n = now(CLOCK_MONOTONIC);
1870                 m = until > n ? until - n : 0;
1871         }
1872
1873         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1874                 m = timeout_usec;
1875
1876         zero(p);
1877         p.fd = bus->fd;
1878         p.events = e;
1879
1880         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1881         if (r < 0)
1882                 return -errno;
1883
1884         return r > 0 ? 1 : 0;
1885 }
1886
1887 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1888
1889         if (!bus)
1890                 return -EINVAL;
1891         if (bus->state == BUS_UNSET)
1892                 return -ENOTCONN;
1893         if (bus->fd < 0)
1894                 return -ENOTCONN;
1895         if (bus->rqueue_size > 0)
1896                 return 0;
1897
1898         return bus_poll(bus, false, timeout_usec);
1899 }
1900
1901 int sd_bus_flush(sd_bus *bus) {
1902         int r;
1903
1904         if (!bus)
1905                 return -EINVAL;
1906         if (bus->state == BUS_UNSET)
1907                 return -ENOTCONN;
1908         if (bus->fd < 0)
1909                 return -ENOTCONN;
1910
1911         r = bus_ensure_running(bus);
1912         if (r < 0)
1913                 return r;
1914
1915         if (bus->wqueue_size <= 0)
1916                 return 0;
1917
1918         for (;;) {
1919                 r = dispatch_wqueue(bus);
1920                 if (r < 0)
1921                         return r;
1922
1923                 if (bus->wqueue_size <= 0)
1924                         return 0;
1925
1926                 r = bus_poll(bus, false, (uint64_t) -1);
1927                 if (r < 0)
1928                         return r;
1929         }
1930 }
1931
1932 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1933         struct filter_callback *f;
1934
1935         if (!bus)
1936                 return -EINVAL;
1937         if (!callback)
1938                 return -EINVAL;
1939
1940         f = new(struct filter_callback, 1);
1941         if (!f)
1942                 return -ENOMEM;
1943         f->callback = callback;
1944         f->userdata = userdata;
1945
1946         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1947         return 0;
1948 }
1949
1950 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1951         struct filter_callback *f;
1952
1953         if (!bus)
1954                 return -EINVAL;
1955         if (!callback)
1956                 return -EINVAL;
1957
1958         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1959                 if (f->callback == callback && f->userdata == userdata) {
1960                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1961                         free(f);
1962                         return 1;
1963                 }
1964         }
1965
1966         return 0;
1967 }
1968
1969 static int bus_add_object(
1970                 sd_bus *bus,
1971                 bool fallback,
1972                 const char *path,
1973                 sd_message_handler_t callback,
1974                 void *userdata) {
1975
1976         struct object_callback *c;
1977         int r;
1978
1979         if (!bus)
1980                 return -EINVAL;
1981         if (!path)
1982                 return -EINVAL;
1983         if (!callback)
1984                 return -EINVAL;
1985
1986         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1987         if (r < 0)
1988                 return r;
1989
1990         c = new(struct object_callback, 1);
1991         if (!c)
1992                 return -ENOMEM;
1993
1994         c->path = strdup(path);
1995         if (!c->path) {
1996                 free(c);
1997                 return -ENOMEM;
1998         }
1999
2000         c->callback = callback;
2001         c->userdata = userdata;
2002         c->is_fallback = fallback;
2003
2004         r = hashmap_put(bus->object_callbacks, c->path, c);
2005         if (r < 0) {
2006                 free(c->path);
2007                 free(c);
2008                 return r;
2009         }
2010
2011         return 0;
2012 }
2013
2014 static int bus_remove_object(
2015                 sd_bus *bus,
2016                 bool fallback,
2017                 const char *path,
2018                 sd_message_handler_t callback,
2019                 void *userdata) {
2020
2021         struct object_callback *c;
2022
2023         if (!bus)
2024                 return -EINVAL;
2025         if (!path)
2026                 return -EINVAL;
2027         if (!callback)
2028                 return -EINVAL;
2029
2030         c = hashmap_get(bus->object_callbacks, path);
2031         if (!c)
2032                 return 0;
2033
2034         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2035                 return 0;
2036
2037         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2038
2039         free(c->path);
2040         free(c);
2041
2042         return 1;
2043 }
2044
2045 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2046         return bus_add_object(bus, false, path, callback, userdata);
2047 }
2048
2049 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2050         return bus_remove_object(bus, false, path, callback, userdata);
2051 }
2052
2053 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2054         return bus_add_object(bus, true, prefix, callback, userdata);
2055 }
2056
2057 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2058         return bus_remove_object(bus, true, prefix, callback, userdata);
2059 }