chiark / gitweb /
bus: fix missing variable initialization
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "strv.h"
33 #include "set.h"
34
35 #include "sd-bus.h"
36 #include "bus-internal.h"
37 #include "bus-message.h"
38 #include "bus-type.h"
39 #include "bus-socket.h"
40
41 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
42
43 static void bus_free(sd_bus *b) {
44         struct filter_callback *f;
45         struct object_callback *c;
46         unsigned i;
47
48         assert(b);
49
50         if (b->fd >= 0)
51                 close_nointr_nofail(b->fd);
52
53         free(b->rbuffer);
54         free(b->unique_name);
55         free(b->auth_uid);
56         free(b->address);
57
58         free(b->exec_path);
59         strv_free(b->exec_argv);
60
61         close_many(b->fds, b->n_fds);
62         free(b->fds);
63
64         for (i = 0; i < b->rqueue_size; i++)
65                 sd_bus_message_unref(b->rqueue[i]);
66         free(b->rqueue);
67
68         for (i = 0; i < b->wqueue_size; i++)
69                 sd_bus_message_unref(b->wqueue[i]);
70         free(b->wqueue);
71
72         hashmap_free_free(b->reply_callbacks);
73         prioq_free(b->reply_callbacks_prioq);
74
75         while ((f = b->filter_callbacks)) {
76                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
77                 free(f);
78         }
79
80         while ((c = hashmap_steal_first(b->object_callbacks))) {
81                 free(c->path);
82                 free(c);
83         }
84
85         hashmap_free(b->object_callbacks);
86
87         free(b);
88 }
89
90 int sd_bus_new(sd_bus **ret) {
91         sd_bus *r;
92
93         if (!ret)
94                 return -EINVAL;
95
96         r = new0(sd_bus, 1);
97         if (!r)
98                 return -ENOMEM;
99
100         r->n_ref = 1;
101         r->fd = -1;
102         r->message_version = 1;
103         r->negotiate_fds = true;
104
105         /* We guarantee that wqueue always has space for at least one
106          * entry */
107         r->wqueue = new(sd_bus_message*, 1);
108         if (!r->wqueue) {
109                 free(r);
110                 return -ENOMEM;
111         }
112
113         *ret = r;
114         return 0;
115 }
116
117 int sd_bus_set_address(sd_bus *bus, const char *address) {
118         char *a;
119
120         if (!bus)
121                 return -EINVAL;
122         if (bus->state != BUS_UNSET)
123                 return -EPERM;
124         if (!address)
125                 return -EINVAL;
126
127         a = strdup(address);
128         if (!a)
129                 return -ENOMEM;
130
131         free(bus->address);
132         bus->address = a;
133
134         return 0;
135 }
136
137 int sd_bus_set_fd(sd_bus *bus, int fd) {
138         if (!bus)
139                 return -EINVAL;
140         if (bus->state != BUS_UNSET)
141                 return -EPERM;
142         if (fd < 0)
143                 return -EINVAL;
144
145         bus->fd = fd;
146         return 0;
147 }
148
149 int sd_bus_set_exec(sd_bus *bus, const char *path, char *const argv[]) {
150         char *p, **a;
151
152         if (!bus)
153                 return -EINVAL;
154         if (bus->state != BUS_UNSET)
155                 return -EPERM;
156         if (!path)
157                 return -EINVAL;
158         if (strv_isempty(argv))
159                 return -EINVAL;
160
161         p = strdup(path);
162         if (!p)
163                 return -ENOMEM;
164
165         a = strv_copy(argv);
166         if (!a) {
167                 free(p);
168                 return -ENOMEM;
169         }
170
171         free(bus->exec_path);
172         strv_free(bus->exec_argv);
173
174         bus->exec_path = p;
175         bus->exec_argv = a;
176
177         return 0;
178 }
179
180 int sd_bus_set_bus_client(sd_bus *bus, int b) {
181         if (!bus)
182                 return -EINVAL;
183         if (bus->state != BUS_UNSET)
184                 return -EPERM;
185
186         bus->bus_client = !!b;
187         return 0;
188 }
189
190 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
191         if (!bus)
192                 return -EINVAL;
193         if (bus->state != BUS_UNSET)
194                 return -EPERM;
195
196         bus->negotiate_fds = !!b;
197         return 0;
198 }
199
200 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
201         const char *s;
202         int r;
203
204         assert(bus);
205         assert(bus->state == BUS_HELLO);
206
207         if (error != 0)
208                 return -error;
209
210         assert(reply);
211
212         r = sd_bus_message_read(reply, "s", &s);
213         if (r < 0)
214                 return r;
215
216         if (!service_name_is_valid(s) || s[0] != ':')
217                 return -EBADMSG;
218
219         bus->unique_name = strdup(s);
220         if (!bus->unique_name)
221                 return -ENOMEM;
222
223         bus->state = BUS_RUNNING;
224
225         return 1;
226 }
227
228 static int bus_send_hello(sd_bus *bus) {
229         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
230         int r;
231
232         assert(bus);
233
234         if (!bus->bus_client)
235                 return 0;
236
237         r = sd_bus_message_new_method_call(
238                         bus,
239                         "org.freedesktop.DBus",
240                         "/",
241                         "org.freedesktop.DBus",
242                         "Hello",
243                         &m);
244         if (r < 0)
245                 return r;
246
247         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
248         if (r < 0)
249                 return r;
250
251         return r;
252 }
253
254 int bus_start_running(sd_bus *bus) {
255         assert(bus);
256
257         if (bus->bus_client) {
258                 bus->state = BUS_HELLO;
259                 return 1;
260         }
261
262         bus->state = BUS_RUNNING;
263         return 1;
264 }
265
266 static int parse_address_key(const char **p, const char *key, char **value) {
267         size_t l, n = 0;
268         const char *a;
269         char *r = NULL;
270
271         assert(p);
272         assert(*p);
273         assert(value);
274
275         if (key) {
276                 l = strlen(key);
277                 if (strncmp(*p, key, l) != 0)
278                         return 0;
279
280                 if ((*p)[l] != '=')
281                         return 0;
282
283                 if (*value)
284                         return -EINVAL;
285
286                 a = *p + l + 1;
287         } else
288                 a = *p;
289
290         while (*a != ';' && *a != ',' && *a != 0) {
291                 char c, *t;
292
293                 if (*a == '%') {
294                         int x, y;
295
296                         x = unhexchar(a[1]);
297                         if (x < 0) {
298                                 free(r);
299                                 return x;
300                         }
301
302                         y = unhexchar(a[2]);
303                         if (y < 0) {
304                                 free(r);
305                                 return y;
306                         }
307
308                         c = (char) ((x << 4) | y);
309                         a += 3;
310                 } else {
311                         c = *a;
312                         a++;
313                 }
314
315                 t = realloc(r, n + 2);
316                 if (!t) {
317                         free(r);
318                         return -ENOMEM;
319                 }
320
321                 r = t;
322                 r[n++] = c;
323         }
324
325         if (!r) {
326                 r = strdup("");
327                 if (!r)
328                         return -ENOMEM;
329         } else
330                 r[n] = 0;
331
332         if (*a == ',')
333                 a++;
334
335         *p = a;
336
337         free(*value);
338         *value = r;
339
340         return 1;
341 }
342
343 static void skip_address_key(const char **p) {
344         assert(p);
345         assert(*p);
346
347         *p += strcspn(*p, ",");
348
349         if (**p == ',')
350                 (*p) ++;
351 }
352
353 static int parse_unix_address(sd_bus *b, const char **p, char **guid) {
354         _cleanup_free_ char *path = NULL, *abstract = NULL;
355         size_t l;
356         int r;
357
358         assert(b);
359         assert(p);
360         assert(*p);
361         assert(guid);
362
363         while (**p != 0 && **p != ';') {
364                 r = parse_address_key(p, "guid", guid);
365                 if (r < 0)
366                         return r;
367                 else if (r > 0)
368                         continue;
369
370                 r = parse_address_key(p, "path", &path);
371                 if (r < 0)
372                         return r;
373                 else if (r > 0)
374                         continue;
375
376                 r = parse_address_key(p, "abstract", &abstract);
377                 if (r < 0)
378                         return r;
379                 else if (r > 0)
380                         continue;
381
382                 skip_address_key(p);
383         }
384
385         if (!path && !abstract)
386                 return -EINVAL;
387
388         if (path && abstract)
389                 return -EINVAL;
390
391         if (path) {
392                 l = strlen(path);
393                 if (l > sizeof(b->sockaddr.un.sun_path))
394                         return -E2BIG;
395
396                 b->sockaddr.un.sun_family = AF_UNIX;
397                 strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
398                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
399         } else if (abstract) {
400                 l = strlen(abstract);
401                 if (l > sizeof(b->sockaddr.un.sun_path) - 1)
402                         return -E2BIG;
403
404                 b->sockaddr.un.sun_family = AF_UNIX;
405                 b->sockaddr.un.sun_path[0] = 0;
406                 strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
407                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
408         }
409
410         return 0;
411 }
412
413 static int parse_tcp_address(sd_bus *b, const char **p, char **guid) {
414         _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
415         struct addrinfo hints, *result;
416         int r;
417
418         assert(b);
419         assert(p);
420         assert(*p);
421         assert(guid);
422
423         while (**p != 0 && **p != ';') {
424                 r = parse_address_key(p, "guid", guid);
425                 if (r < 0)
426                         return r;
427                 else if (r > 0)
428                         continue;
429
430                 r = parse_address_key(p, "host", &host);
431                 if (r < 0)
432                         return r;
433                 else if (r > 0)
434                         continue;
435
436                 r = parse_address_key(p, "port", &port);
437                 if (r < 0)
438                         return r;
439                 else if (r > 0)
440                         continue;
441
442                 r = parse_address_key(p, "family", &family);
443                 if (r < 0)
444                         return r;
445                 else if (r > 0)
446                         continue;
447
448                 skip_address_key(p);
449         }
450
451         if (!host || !port)
452                 return -EINVAL;
453
454         zero(hints);
455         hints.ai_socktype = SOCK_STREAM;
456         hints.ai_flags = AI_ADDRCONFIG;
457
458         if (family) {
459                 if (streq(family, "ipv4"))
460                         hints.ai_family = AF_INET;
461                 else if (streq(family, "ipv6"))
462                         hints.ai_family = AF_INET6;
463                 else
464                         return -EINVAL;
465         }
466
467         r = getaddrinfo(host, port, &hints, &result);
468         if (r == EAI_SYSTEM)
469                 return -errno;
470         else if (r != 0)
471                 return -EADDRNOTAVAIL;
472
473         memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
474         b->sockaddr_size = result->ai_addrlen;
475
476         freeaddrinfo(result);
477
478         return 0;
479 }
480
481 static int parse_exec_address(sd_bus *b, const char **p, char **guid) {
482         char *path = NULL;
483         unsigned n_argv = 0, j;
484         char **argv = NULL;
485         int r;
486
487         assert(b);
488         assert(p);
489         assert(*p);
490         assert(guid);
491
492         while (**p != 0 && **p != ';') {
493                 r = parse_address_key(p, "guid", guid);
494                 if (r < 0)
495                         goto fail;
496                 else if (r > 0)
497                         continue;
498
499                 r = parse_address_key(p, "path", &path);
500                 if (r < 0)
501                         goto fail;
502                 else if (r > 0)
503                         continue;
504
505                 if (startswith(*p, "argv")) {
506                         unsigned ul;
507
508                         errno = 0;
509                         ul = strtoul(*p + 4, (char**) p, 10);
510                         if (errno != 0 || **p != '=' || ul > 256) {
511                                 r = -EINVAL;
512                                 goto fail;
513                         }
514
515                         (*p) ++;
516
517                         if (ul >= n_argv) {
518                                 char **x;
519
520                                 x = realloc(argv, sizeof(char*) * (ul + 2));
521                                 if (!x) {
522                                         r = -ENOMEM;
523                                         goto fail;
524                                 }
525
526                                 memset(x + n_argv, 0, sizeof(char*) * (ul - n_argv + 2));
527
528                                 argv = x;
529                                 n_argv = ul + 1;
530                         }
531
532                         r = parse_address_key(p, NULL, argv + ul);
533                         if (r < 0)
534                                 goto fail;
535
536                         continue;
537                 }
538
539                 skip_address_key(p);
540         }
541
542         if (!path) {
543                 r = -EINVAL;
544                 goto fail;
545         }
546
547         /* Make sure there are no holes in the array, with the
548          * exception of argv[0] */
549         for (j = 1; j < n_argv; j++)
550                 if (!argv[j]) {
551                         r = -EINVAL;
552                         goto fail;
553                 }
554
555         if (argv && argv[0] == NULL) {
556                 argv[0] = strdup(path);
557                 if (!argv[0]) {
558                         r = -ENOMEM;
559                         goto fail;
560                 }
561         }
562
563         b->exec_path = path;
564         b->exec_argv = argv;
565         return 0;
566
567 fail:
568         for (j = 0; j < n_argv; j++)
569                 free(argv[j]);
570
571         free(argv);
572         free(path);
573         return r;
574 }
575
576 static void bus_reset_parsed_address(sd_bus *b) {
577         assert(b);
578
579         zero(b->sockaddr);
580         b->sockaddr_size = 0;
581         strv_free(b->exec_argv);
582         free(b->exec_path);
583         b->exec_path = NULL;
584         b->exec_argv = NULL;
585         b->peer = SD_ID128_NULL;
586 }
587
588 static int bus_parse_next_address(sd_bus *b) {
589         _cleanup_free_ char *guid = NULL;
590         const char *a;
591         int r;
592
593         assert(b);
594
595         if (!b->address)
596                 return 0;
597         if (b->address[b->address_index] == 0)
598                 return 0;
599
600         bus_reset_parsed_address(b);
601
602         a = b->address + b->address_index;
603
604         while (*a != 0) {
605
606                 if (*a == ';') {
607                         a++;
608                         continue;
609                 }
610
611                 if (startswith(a, "unix:")) {
612                         a += 5;
613
614                         r = parse_unix_address(b, &a, &guid);
615                         if (r < 0)
616                                 return r;
617                         break;
618
619                 } else if (startswith(a, "tcp:")) {
620
621                         a += 4;
622                         r = parse_tcp_address(b, &a, &guid);
623                         if (r < 0)
624                                 return r;
625
626                         break;
627
628                 } else if (startswith(a, "unixexec:")) {
629
630                         a += 9;
631                         r = parse_exec_address(b, &a, &guid);
632                         if (r < 0)
633                                 return r;
634
635                         break;
636
637                 }
638
639                 a = strchr(a, ';');
640                 if (!a)
641                         return 0;
642         }
643
644         if (guid) {
645                 r = sd_id128_from_string(guid, &b->peer);
646                 if (r < 0)
647                         return r;
648         }
649
650         b->address_index = a - b->address;
651         return 1;
652 }
653
654 static int bus_start_address(sd_bus *b) {
655         int r;
656
657         assert(b);
658
659         for (;;) {
660                 if (b->fd >= 0) {
661                         close_nointr_nofail(b->fd);
662                         b->fd = -1;
663                 }
664
665                 if (b->sockaddr.sa.sa_family != AF_UNSPEC) {
666
667                         r = bus_socket_connect(b);
668                         if (r >= 0)
669                                 return r;
670
671                         b->last_connect_error = -r;
672
673                 } else if (b->exec_path) {
674
675                         r = bus_socket_exec(b);
676                         if (r >= 0)
677                                 return r;
678
679                         b->last_connect_error = -r;
680                 }
681
682                 r = bus_parse_next_address(b);
683                 if (r < 0)
684                         return r;
685                 if (r == 0)
686                         return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
687         }
688 }
689
690 int bus_next_address(sd_bus *b) {
691         assert(b);
692
693         bus_reset_parsed_address(b);
694         return bus_start_address(b);
695 }
696
697 static int bus_start_fd(sd_bus *b) {
698         int r;
699
700         assert(b);
701
702         r = fd_nonblock(b->fd, true);
703         if (r < 0)
704                 return r;
705
706         r = fd_cloexec(b->fd, true);
707         if (r < 0)
708                 return r;
709
710         return bus_socket_take_fd(b);
711 }
712
713 int sd_bus_start(sd_bus *bus) {
714         int r;
715
716         if (!bus)
717                 return -EINVAL;
718         if (bus->state != BUS_UNSET)
719                 return -EPERM;
720
721         bus->state = BUS_OPENING;
722
723         if (bus->fd >= 0)
724                 r = bus_start_fd(bus);
725         else if (bus->address)
726                 r = bus_start_address(bus);
727         else
728                 return -EINVAL;
729
730         if (r < 0)
731                 return r;
732
733         return bus_send_hello(bus);
734 }
735
736 int sd_bus_open_system(sd_bus **ret) {
737         const char *e;
738         sd_bus *b;
739         int r;
740
741         if (!ret)
742                 return -EINVAL;
743
744         r = sd_bus_new(&b);
745         if (r < 0)
746                 return r;
747
748         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
749         if (e) {
750                 r = sd_bus_set_address(b, e);
751                 if (r < 0)
752                         goto fail;
753         } else {
754                 b->sockaddr.un.sun_family = AF_UNIX;
755                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
756                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
757         }
758
759         b->bus_client = true;
760
761         r = sd_bus_start(b);
762         if (r < 0)
763                 goto fail;
764
765         *ret = b;
766         return 0;
767
768 fail:
769         bus_free(b);
770         return r;
771 }
772
773 int sd_bus_open_user(sd_bus **ret) {
774         const char *e;
775         sd_bus *b;
776         size_t l;
777         int r;
778
779         if (!ret)
780                 return -EINVAL;
781
782         r = sd_bus_new(&b);
783         if (r < 0)
784                 return r;
785
786         e = getenv("DBUS_SESSION_BUS_ADDRESS");
787         if (e) {
788                 r = sd_bus_set_address(b, e);
789                 if (r < 0)
790                         goto fail;
791         } else {
792                 e = getenv("XDG_RUNTIME_DIR");
793                 if (!e) {
794                         r = -ENOENT;
795                         goto fail;
796                 }
797
798                 l = strlen(e);
799                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
800                         r = -E2BIG;
801                         goto fail;
802                 }
803
804                 b->sockaddr.un.sun_family = AF_UNIX;
805                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
806                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
807         }
808
809         b->bus_client = true;
810
811         r = sd_bus_start(b);
812         if (r < 0)
813                 goto fail;
814
815         *ret = b;
816         return 0;
817
818 fail:
819         bus_free(b);
820         return r;
821 }
822
823 void sd_bus_close(sd_bus *bus) {
824         if (!bus)
825                 return;
826         if (bus->fd < 0)
827                 return;
828
829         close_nointr_nofail(bus->fd);
830         bus->fd = -1;
831 }
832
833 sd_bus *sd_bus_ref(sd_bus *bus) {
834         if (!bus)
835                 return NULL;
836
837         assert(bus->n_ref > 0);
838
839         bus->n_ref++;
840         return bus;
841 }
842
843 sd_bus *sd_bus_unref(sd_bus *bus) {
844         if (!bus)
845                 return NULL;
846
847         assert(bus->n_ref > 0);
848         bus->n_ref--;
849
850         if (bus->n_ref <= 0)
851                 bus_free(bus);
852
853         return NULL;
854 }
855
856 int sd_bus_is_open(sd_bus *bus) {
857         if (!bus)
858                 return -EINVAL;
859
860         return bus->state != BUS_UNSET && bus->fd >= 0;
861 }
862
863 int sd_bus_can_send(sd_bus *bus, char type) {
864         int r;
865
866         if (!bus)
867                 return -EINVAL;
868         if (bus->fd < 0)
869                 return -ENOTCONN;
870
871         if (type == SD_BUS_TYPE_UNIX_FD) {
872                 if (!bus->negotiate_fds)
873                         return 0;
874
875                 r = bus_ensure_running(bus);
876                 if (r < 0)
877                         return r;
878
879                 return bus->can_fds;
880         }
881
882         return bus_type_is_valid(type);
883 }
884
885 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
886         int r;
887
888         if (!bus)
889                 return -EINVAL;
890         if (!peer)
891                 return -EINVAL;
892
893         r = bus_ensure_running(bus);
894         if (r < 0)
895                 return r;
896
897         *peer = bus->peer;
898         return 0;
899 }
900
901 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
902         assert(m);
903
904         if (m->header->version > b->message_version)
905                 return -EPERM;
906
907         if (m->sealed)
908                 return 0;
909
910         return bus_message_seal(m, ++b->serial);
911 }
912
913 static int dispatch_wqueue(sd_bus *bus) {
914         int r, ret = 0;
915
916         assert(bus);
917         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
918
919         if (bus->fd < 0)
920                 return -ENOTCONN;
921
922         while (bus->wqueue_size > 0) {
923
924                 r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
925                 if (r < 0) {
926                         sd_bus_close(bus);
927                         return r;
928                 } else if (r == 0)
929                         /* Didn't do anything this time */
930                         return ret;
931                 else if (bus->windex >= bus->wqueue[0]->size) {
932                         /* Fully written. Let's drop the entry from
933                          * the queue.
934                          *
935                          * This isn't particularly optimized, but
936                          * well, this is supposed to be our worst-case
937                          * buffer only, and the socket buffer is
938                          * supposed to be our primary buffer, and if
939                          * it got full, then all bets are off
940                          * anyway. */
941
942                         sd_bus_message_unref(bus->wqueue[0]);
943                         bus->wqueue_size --;
944                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
945                         bus->windex = 0;
946
947                         ret = 1;
948                 }
949         }
950
951         return ret;
952 }
953
954 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
955         sd_bus_message *z = NULL;
956         int r, ret = 0;
957
958         assert(bus);
959         assert(m);
960         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
961
962         if (bus->fd < 0)
963                 return -ENOTCONN;
964
965         if (bus->rqueue_size > 0) {
966                 /* Dispatch a queued message */
967
968                 *m = bus->rqueue[0];
969                 bus->rqueue_size --;
970                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
971                 return 1;
972         }
973
974         /* Try to read a new message */
975         do {
976                 r = bus_socket_read_message(bus, &z);
977                 if (r < 0) {
978                         sd_bus_close(bus);
979                         return r;
980                 }
981                 if (r == 0)
982                         return ret;
983
984                 r = 1;
985         } while (!z);
986
987         *m = z;
988         return 1;
989 }
990
991 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
992         int r;
993
994         if (!bus)
995                 return -EINVAL;
996         if (bus->state == BUS_UNSET)
997                 return -ENOTCONN;
998         if (bus->fd < 0)
999                 return -ENOTCONN;
1000         if (!m)
1001                 return -EINVAL;
1002
1003         if (m->n_fds > 0) {
1004                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1005                 if (r < 0)
1006                         return r;
1007                 if (r == 0)
1008                         return -ENOTSUP;
1009         }
1010
1011         /* If the serial number isn't kept, then we know that no reply
1012          * is expected */
1013         if (!serial && !m->sealed)
1014                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1015
1016         r = bus_seal_message(bus, m);
1017         if (r < 0)
1018                 return r;
1019
1020         /* If this is a reply and no reply was requested, then let's
1021          * suppress this, if we can */
1022         if (m->dont_send && !serial)
1023                 return 0;
1024
1025         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1026                 size_t idx = 0;
1027
1028                 r = bus_socket_write_message(bus, m, &idx);
1029                 if (r < 0) {
1030                         sd_bus_close(bus);
1031                         return r;
1032                 } else if (idx < m->size)  {
1033                         /* Wasn't fully written. So let's remember how
1034                          * much was written. Note that the first entry
1035                          * of the wqueue array is always allocated so
1036                          * that we always can remember how much was
1037                          * written. */
1038                         bus->wqueue[0] = sd_bus_message_ref(m);
1039                         bus->wqueue_size = 1;
1040                         bus->windex = idx;
1041                 }
1042         } else {
1043                 sd_bus_message **q;
1044
1045                 /* Just append it to the queue. */
1046
1047                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1048                         return -ENOBUFS;
1049
1050                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1051                 if (!q)
1052                         return -ENOMEM;
1053
1054                 bus->wqueue = q;
1055                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1056         }
1057
1058         if (serial)
1059                 *serial = BUS_MESSAGE_SERIAL(m);
1060
1061         return 0;
1062 }
1063
1064 static usec_t calc_elapse(uint64_t usec) {
1065         if (usec == (uint64_t) -1)
1066                 return 0;
1067
1068         if (usec == 0)
1069                 usec = BUS_DEFAULT_TIMEOUT;
1070
1071         return now(CLOCK_MONOTONIC) + usec;
1072 }
1073
1074 static int timeout_compare(const void *a, const void *b) {
1075         const struct reply_callback *x = a, *y = b;
1076
1077         if (x->timeout != 0 && y->timeout == 0)
1078                 return -1;
1079
1080         if (x->timeout == 0 && y->timeout != 0)
1081                 return 1;
1082
1083         if (x->timeout < y->timeout)
1084                 return -1;
1085
1086         if (x->timeout > y->timeout)
1087                 return 1;
1088
1089         return 0;
1090 }
1091
1092 int sd_bus_send_with_reply(
1093                 sd_bus *bus,
1094                 sd_bus_message *m,
1095                 sd_message_handler_t callback,
1096                 void *userdata,
1097                 uint64_t usec,
1098                 uint64_t *serial) {
1099
1100         struct reply_callback *c;
1101         int r;
1102
1103         if (!bus)
1104                 return -EINVAL;
1105         if (bus->state == BUS_UNSET)
1106                 return -ENOTCONN;
1107         if (bus->fd < 0)
1108                 return -ENOTCONN;
1109         if (!m)
1110                 return -EINVAL;
1111         if (!callback)
1112                 return -EINVAL;
1113         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1114                 return -EINVAL;
1115         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1116                 return -EINVAL;
1117
1118         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1119         if (r < 0)
1120                 return r;
1121
1122         if (usec != (uint64_t) -1) {
1123                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1124                 if (r < 0)
1125                         return r;
1126         }
1127
1128         r = bus_seal_message(bus, m);
1129         if (r < 0)
1130                 return r;
1131
1132         c = new(struct reply_callback, 1);
1133         if (!c)
1134                 return -ENOMEM;
1135
1136         c->callback = callback;
1137         c->userdata = userdata;
1138         c->serial = BUS_MESSAGE_SERIAL(m);
1139         c->timeout = calc_elapse(usec);
1140
1141         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1142         if (r < 0) {
1143                 free(c);
1144                 return r;
1145         }
1146
1147         if (c->timeout != 0) {
1148                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1149                 if (r < 0) {
1150                         c->timeout = 0;
1151                         sd_bus_send_with_reply_cancel(bus, c->serial);
1152                         return r;
1153                 }
1154         }
1155
1156         r = sd_bus_send(bus, m, serial);
1157         if (r < 0) {
1158                 sd_bus_send_with_reply_cancel(bus, c->serial);
1159                 return r;
1160         }
1161
1162         return r;
1163 }
1164
1165 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1166         struct reply_callback *c;
1167
1168         if (!bus)
1169                 return -EINVAL;
1170         if (serial == 0)
1171                 return -EINVAL;
1172
1173         c = hashmap_remove(bus->reply_callbacks, &serial);
1174         if (!c)
1175                 return 0;
1176
1177         if (c->timeout != 0)
1178                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1179
1180         free(c);
1181         return 1;
1182 }
1183
1184 int bus_ensure_running(sd_bus *bus) {
1185         int r;
1186
1187         assert(bus);
1188
1189         if (bus->fd < 0)
1190                 return -ENOTCONN;
1191         if (bus->state == BUS_UNSET)
1192                 return -ENOTCONN;
1193
1194         if (bus->state == BUS_RUNNING)
1195                 return 1;
1196
1197         for (;;) {
1198                 r = sd_bus_process(bus, NULL);
1199                 if (r < 0)
1200                         return r;
1201                 if (bus->state == BUS_RUNNING)
1202                         return 1;
1203                 if (r > 0)
1204                         continue;
1205
1206                 r = sd_bus_wait(bus, (uint64_t) -1);
1207                 if (r < 0)
1208                         return r;
1209         }
1210 }
1211
1212 int sd_bus_send_with_reply_and_block(
1213                 sd_bus *bus,
1214                 sd_bus_message *m,
1215                 uint64_t usec,
1216                 sd_bus_error *error,
1217                 sd_bus_message **reply) {
1218
1219         int r;
1220         usec_t timeout;
1221         uint64_t serial;
1222         bool room = false;
1223
1224         if (!bus)
1225                 return -EINVAL;
1226         if (bus->fd < 0)
1227                 return -ENOTCONN;
1228         if (bus->state == BUS_UNSET)
1229                 return -ENOTCONN;
1230         if (!m)
1231                 return -EINVAL;
1232         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1233                 return -EINVAL;
1234         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1235                 return -EINVAL;
1236         if (bus_error_is_dirty(error))
1237                 return -EINVAL;
1238
1239         r = bus_ensure_running(bus);
1240         if (r < 0)
1241                 return r;
1242
1243         r = sd_bus_send(bus, m, &serial);
1244         if (r < 0)
1245                 return r;
1246
1247         timeout = calc_elapse(usec);
1248
1249         for (;;) {
1250                 usec_t left;
1251                 sd_bus_message *incoming = NULL;
1252
1253                 if (!room) {
1254                         sd_bus_message **q;
1255
1256                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1257                                 return -ENOBUFS;
1258
1259                         /* Make sure there's room for queuing this
1260                          * locally, before we read the message */
1261
1262                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1263                         if (!q)
1264                                 return -ENOMEM;
1265
1266                         bus->rqueue = q;
1267                         room = true;
1268                 }
1269
1270                 r = bus_socket_read_message(bus, &incoming);
1271                 if (r < 0)
1272                         return r;
1273                 if (incoming) {
1274
1275                         if (incoming->reply_serial == serial) {
1276                                 /* Found a match! */
1277
1278                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1279                                         *reply = incoming;
1280                                         return 0;
1281                                 }
1282
1283                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1284                                         int k;
1285
1286                                         r = sd_bus_error_copy(error, &incoming->error);
1287                                         if (r < 0) {
1288                                                 sd_bus_message_unref(incoming);
1289                                                 return r;
1290                                         }
1291
1292                                         k = bus_error_to_errno(&incoming->error);
1293                                         sd_bus_message_unref(incoming);
1294                                         return k;
1295                                 }
1296
1297                                 sd_bus_message_unref(incoming);
1298                                 return -EIO;
1299                         }
1300
1301                         /* There's already guaranteed to be room for
1302                          * this, so need to resize things here */
1303                         bus->rqueue[bus->rqueue_size ++] = incoming;
1304                         room = false;
1305
1306                         /* Try to read more, right-away */
1307                         continue;
1308                 }
1309                 if (r != 0)
1310                         continue;
1311
1312                 if (timeout > 0) {
1313                         usec_t n;
1314
1315                         n = now(CLOCK_MONOTONIC);
1316                         if (n >= timeout)
1317                                 return -ETIMEDOUT;
1318
1319                         left = timeout - n;
1320                 } else
1321                         left = (uint64_t) -1;
1322
1323                 r = bus_poll(bus, true, left);
1324                 if (r < 0)
1325                         return r;
1326
1327                 r = dispatch_wqueue(bus);
1328                 if (r < 0)
1329                         return r;
1330         }
1331 }
1332
1333 int sd_bus_get_fd(sd_bus *bus) {
1334         if (!bus)
1335                 return -EINVAL;
1336
1337         if (bus->fd < 0)
1338                 return -ENOTCONN;
1339
1340         return bus->fd;
1341 }
1342
1343 int sd_bus_get_events(sd_bus *bus) {
1344         int flags = 0;
1345
1346         if (!bus)
1347                 return -EINVAL;
1348         if (bus->state == BUS_UNSET)
1349                 return -ENOTCONN;
1350         if (bus->fd < 0)
1351                 return -ENOTCONN;
1352
1353         if (bus->state == BUS_OPENING)
1354                 flags |= POLLOUT;
1355         else if (bus->state == BUS_AUTHENTICATING) {
1356
1357                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1358                         flags |= POLLOUT;
1359
1360                 flags |= POLLIN;
1361
1362         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1363                 if (bus->rqueue_size <= 0)
1364                         flags |= POLLIN;
1365                 if (bus->wqueue_size > 0)
1366                         flags |= POLLOUT;
1367         }
1368
1369         return flags;
1370 }
1371
1372 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1373         struct reply_callback *c;
1374
1375         if (!bus)
1376                 return -EINVAL;
1377         if (!timeout_usec)
1378                 return -EINVAL;
1379         if (bus->state == BUS_UNSET)
1380                 return -ENOTCONN;
1381         if (bus->fd < 0)
1382                 return -ENOTCONN;
1383
1384         if (bus->state == BUS_AUTHENTICATING) {
1385                 *timeout_usec = bus->auth_timeout;
1386                 return 1;
1387         }
1388
1389         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1390                 return 0;
1391
1392         c = prioq_peek(bus->reply_callbacks_prioq);
1393         if (!c)
1394                 return 0;
1395
1396         *timeout_usec = c->timeout;
1397         return 1;
1398 }
1399
1400 static int process_timeout(sd_bus *bus) {
1401         struct reply_callback *c;
1402         usec_t n;
1403         int r;
1404
1405         assert(bus);
1406
1407         c = prioq_peek(bus->reply_callbacks_prioq);
1408         if (!c)
1409                 return 0;
1410
1411         n = now(CLOCK_MONOTONIC);
1412         if (c->timeout > n)
1413                 return 0;
1414
1415         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1416         hashmap_remove(bus->reply_callbacks, &c->serial);
1417
1418         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1419         free(c);
1420
1421         return r < 0 ? r : 1;
1422 }
1423
1424 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1425         struct reply_callback *c;
1426         int r;
1427
1428         assert(bus);
1429         assert(m);
1430
1431         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1432             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1433                 return 0;
1434
1435         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1436         if (!c)
1437                 return 0;
1438
1439         if (c->timeout != 0)
1440                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1441
1442         r = c->callback(bus, 0, m, c->userdata);
1443         free(c);
1444
1445         return r;
1446 }
1447
1448 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1449         struct filter_callback *l;
1450         int r;
1451
1452         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1453                 r = l->callback(bus, 0, m, l->userdata);
1454                 if (r != 0)
1455                         return r;
1456         }
1457
1458         return 0;
1459 }
1460
1461 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1462         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1463         int r;
1464
1465         assert(bus);
1466         assert(m);
1467
1468         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1469                 return 0;
1470
1471         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1472                 return 0;
1473
1474         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1475                 return 1;
1476
1477         if (streq_ptr(m->member, "Ping"))
1478                 r = sd_bus_message_new_method_return(bus, m, &reply);
1479         else if (streq_ptr(m->member, "GetMachineId")) {
1480                 sd_id128_t id;
1481                 char sid[33];
1482
1483                 r = sd_id128_get_machine(&id);
1484                 if (r < 0)
1485                         return r;
1486
1487                 r = sd_bus_message_new_method_return(bus, m, &reply);
1488                 if (r < 0)
1489                         return r;
1490
1491                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1492         } else {
1493                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1494
1495                 sd_bus_error_set(&error,
1496                                  "org.freedesktop.DBus.Error.UnknownMethod",
1497                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1498
1499                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1500         }
1501
1502         if (r < 0)
1503                 return r;
1504
1505         r = sd_bus_send(bus, reply, NULL);
1506         if (r < 0)
1507                 return r;
1508
1509         return 1;
1510 }
1511
1512 static int process_object(sd_bus *bus, sd_bus_message *m) {
1513         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1514         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1515         struct object_callback *c;
1516         char *p;
1517         int r;
1518         bool found = false;
1519
1520         assert(bus);
1521         assert(m);
1522
1523         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1524                 return 0;
1525
1526         if (hashmap_isempty(bus->object_callbacks))
1527                 return 0;
1528
1529         c = hashmap_get(bus->object_callbacks, m->path);
1530         if (c) {
1531                 r = c->callback(bus, 0, m, c->userdata);
1532                 if (r != 0)
1533                         return r;
1534
1535                 found = true;
1536         }
1537
1538         /* Look for fallback prefixes */
1539         p = strdupa(m->path);
1540         for (;;) {
1541                 char *e;
1542
1543                 e = strrchr(p, '/');
1544                 if (e == p || !e)
1545                         break;
1546
1547                 *e = 0;
1548
1549                 c = hashmap_get(bus->object_callbacks, p);
1550                 if (c && c->is_fallback) {
1551                         r = c->callback(bus, 0, m, c->userdata);
1552                         if (r != 0)
1553                                 return r;
1554
1555                         found = true;
1556                 }
1557         }
1558
1559         /* We found some handlers but none wanted to take this, then
1560          * return this -- with one exception, we can handle
1561          * introspection minimally ourselves */
1562         if (!found || sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1563                 return 0;
1564
1565         sd_bus_error_set(&error,
1566                          "org.freedesktop.DBus.Error.UnknownMethod",
1567                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1568
1569         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1570         if (r < 0)
1571                 return r;
1572
1573         r = sd_bus_send(bus, reply, NULL);
1574         if (r < 0)
1575                 return r;
1576
1577         return 1;
1578 }
1579
1580 static int process_introspect(sd_bus *bus, sd_bus_message *m) {
1581         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1582         _cleanup_free_ char *introspection = NULL;
1583         _cleanup_set_free_free_ Set *s = NULL;
1584         _cleanup_fclose_ FILE *f = NULL;
1585         struct object_callback *c;
1586         Iterator i;
1587         size_t size = 0;
1588         char *node;
1589         int r;
1590
1591         assert(bus);
1592         assert(m);
1593
1594         if (!sd_bus_message_is_method_call(m, "org.freedesktop.DBus.Introspectable", "Introspect"))
1595                 return 0;
1596
1597         if (!m->path)
1598                 return 0;
1599
1600         s = set_new(string_hash_func, string_compare_func);
1601         if (!s)
1602                 return -ENOMEM;
1603
1604         HASHMAP_FOREACH(c, bus->object_callbacks, i) {
1605                 const char *e;
1606                 char *a, *p;
1607
1608                 if (streq(c->path, "/"))
1609                         continue;
1610
1611                 if (streq(m->path, "/"))
1612                         e = c->path;
1613                 else {
1614                         e = startswith(c->path, m->path);
1615                         if (!e || *e != '/')
1616                                 continue;
1617                 }
1618
1619                 a = strdup(e+1);
1620                 if (!a)
1621                         return -ENOMEM;
1622
1623                 p = strchr(a, '/');
1624                 if (p)
1625                         *p = 0;
1626
1627                 r = set_put(s, a);
1628                 if (r < 0) {
1629                         free(a);
1630
1631                         if (r != -EEXIST)
1632                                 return r;
1633                 }
1634         }
1635
1636         f = open_memstream(&introspection, &size);
1637         if (!f)
1638                 return -ENOMEM;
1639
1640         fputs(SD_BUS_INTROSPECT_DOCTYPE, f);
1641         fputs("<node>\n", f);
1642         fputs(SD_BUS_INTROSPECT_INTERFACE_PEER, f);
1643         fputs(SD_BUS_INTROSPECT_INTERFACE_INTROSPECTABLE, f);
1644
1645         while ((node = set_steal_first(s))) {
1646                 fprintf(f, " <node name=\"%s\"/>\n", node);
1647                 free(node);
1648         }
1649
1650         fputs("</node>\n", f);
1651
1652         fflush(f);
1653
1654         if (ferror(f))
1655                 return -ENOMEM;
1656
1657         r = sd_bus_message_new_method_return(bus, m, &reply);
1658         if (r < 0)
1659                 return r;
1660
1661         r = sd_bus_message_append(reply, "s", introspection);
1662         if (r < 0)
1663                 return r;
1664
1665         r = sd_bus_send(bus, reply, NULL);
1666         if (r < 0)
1667                 return r;
1668
1669         return 1;
1670 }
1671
1672 static int process_message(sd_bus *bus, sd_bus_message *m) {
1673         int r;
1674
1675         assert(bus);
1676         assert(m);
1677
1678         r = process_reply(bus, m);
1679         if (r != 0)
1680                 return r;
1681
1682         r = process_filter(bus, m);
1683         if (r != 0)
1684                 return r;
1685
1686         r = process_builtin(bus, m);
1687         if (r != 0)
1688                 return r;
1689
1690         r = process_object(bus, m);
1691         if (r != 0)
1692                 return r;
1693
1694         return process_introspect(bus, m);
1695 }
1696
1697 static int process_running(sd_bus *bus, sd_bus_message **ret) {
1698         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1699         int r;
1700
1701         assert(bus);
1702         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1703
1704         r = process_timeout(bus);
1705         if (r != 0)
1706                 goto null_message;
1707
1708         r = dispatch_wqueue(bus);
1709         if (r != 0)
1710                 goto null_message;
1711
1712         r = dispatch_rqueue(bus, &m);
1713         if (r < 0)
1714                 return r;
1715         if (!m)
1716                 goto null_message;
1717
1718         r = process_message(bus, m);
1719         if (r != 0)
1720                 goto null_message;
1721
1722         if (ret) {
1723                 *ret = m;
1724                 m = NULL;
1725                 return 1;
1726         }
1727
1728         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1729                 _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1730                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1731
1732                 sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1733
1734                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1735                 if (r < 0)
1736                         return r;
1737
1738                 r = sd_bus_send(bus, reply, NULL);
1739                 if (r < 0)
1740                         return r;
1741         }
1742
1743         return 1;
1744
1745 null_message:
1746         if (r >= 0 && ret)
1747                 *ret = NULL;
1748
1749         return r;
1750 }
1751
1752 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1753         int r;
1754
1755         /* Returns 0 when we didn't do anything. This should cause the
1756          * caller to invoke sd_bus_wait() before returning the next
1757          * time. Returns > 0 when we did something, which possibly
1758          * means *ret is filled in with an unprocessed message. */
1759
1760         if (!bus)
1761                 return -EINVAL;
1762         if (bus->fd < 0)
1763                 return -ENOTCONN;
1764
1765         switch (bus->state) {
1766
1767         case BUS_UNSET:
1768                 return -ENOTCONN;
1769
1770         case BUS_OPENING:
1771                 r = bus_socket_process_opening(bus);
1772                 if (r < 0)
1773                         return r;
1774                 if (ret)
1775                         *ret = NULL;
1776                 return r;
1777
1778         case BUS_AUTHENTICATING:
1779
1780                 r = bus_socket_process_authenticating(bus);
1781                 if (r < 0)
1782                         return r;
1783                 if (ret)
1784                         *ret = NULL;
1785                 return r;
1786
1787         case BUS_RUNNING:
1788         case BUS_HELLO:
1789
1790                 return process_running(bus, ret);
1791         }
1792
1793         assert_not_reached("Unknown state");
1794 }
1795
1796 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1797         struct pollfd p;
1798         int r, e;
1799         struct timespec ts;
1800         usec_t until, m;
1801
1802         assert(bus);
1803
1804         if (bus->fd < 0)
1805                 return -ENOTCONN;
1806
1807         e = sd_bus_get_events(bus);
1808         if (e < 0)
1809                 return e;
1810
1811         if (need_more)
1812                 e |= POLLIN;
1813
1814         r = sd_bus_get_timeout(bus, &until);
1815         if (r < 0)
1816                 return r;
1817         if (r == 0)
1818                 m = (uint64_t) -1;
1819         else {
1820                 usec_t n;
1821                 n = now(CLOCK_MONOTONIC);
1822                 m = until > n ? until - n : 0;
1823         }
1824
1825         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1826                 m = timeout_usec;
1827
1828         zero(p);
1829         p.fd = bus->fd;
1830         p.events = e;
1831
1832         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1833         if (r < 0)
1834                 return -errno;
1835
1836         return r > 0 ? 1 : 0;
1837 }
1838
1839 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1840
1841         if (!bus)
1842                 return -EINVAL;
1843         if (bus->state == BUS_UNSET)
1844                 return -ENOTCONN;
1845         if (bus->fd < 0)
1846                 return -ENOTCONN;
1847         if (bus->rqueue_size > 0)
1848                 return 0;
1849
1850         return bus_poll(bus, false, timeout_usec);
1851 }
1852
1853 int sd_bus_flush(sd_bus *bus) {
1854         int r;
1855
1856         if (!bus)
1857                 return -EINVAL;
1858         if (bus->state == BUS_UNSET)
1859                 return -ENOTCONN;
1860         if (bus->fd < 0)
1861                 return -ENOTCONN;
1862
1863         r = bus_ensure_running(bus);
1864         if (r < 0)
1865                 return r;
1866
1867         if (bus->wqueue_size <= 0)
1868                 return 0;
1869
1870         for (;;) {
1871                 r = dispatch_wqueue(bus);
1872                 if (r < 0)
1873                         return r;
1874
1875                 if (bus->wqueue_size <= 0)
1876                         return 0;
1877
1878                 r = bus_poll(bus, false, (uint64_t) -1);
1879                 if (r < 0)
1880                         return r;
1881         }
1882 }
1883
1884 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1885         struct filter_callback *f;
1886
1887         if (!bus)
1888                 return -EINVAL;
1889         if (!callback)
1890                 return -EINVAL;
1891
1892         f = new(struct filter_callback, 1);
1893         if (!f)
1894                 return -ENOMEM;
1895         f->callback = callback;
1896         f->userdata = userdata;
1897
1898         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1899         return 0;
1900 }
1901
1902 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1903         struct filter_callback *f;
1904
1905         if (!bus)
1906                 return -EINVAL;
1907         if (!callback)
1908                 return -EINVAL;
1909
1910         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1911                 if (f->callback == callback && f->userdata == userdata) {
1912                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1913                         free(f);
1914                         return 1;
1915                 }
1916         }
1917
1918         return 0;
1919 }
1920
1921 static int bus_add_object(
1922                 sd_bus *bus,
1923                 bool fallback,
1924                 const char *path,
1925                 sd_message_handler_t callback,
1926                 void *userdata) {
1927
1928         struct object_callback *c;
1929         int r;
1930
1931         if (!bus)
1932                 return -EINVAL;
1933         if (!path)
1934                 return -EINVAL;
1935         if (!callback)
1936                 return -EINVAL;
1937
1938         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1939         if (r < 0)
1940                 return r;
1941
1942         c = new(struct object_callback, 1);
1943         if (!c)
1944                 return -ENOMEM;
1945
1946         c->path = strdup(path);
1947         if (!path) {
1948                 free(c);
1949                 return -ENOMEM;
1950         }
1951
1952         c->callback = callback;
1953         c->userdata = userdata;
1954         c->is_fallback = fallback;
1955
1956         r = hashmap_put(bus->object_callbacks, c->path, c);
1957         if (r < 0) {
1958                 free(c->path);
1959                 free(c);
1960                 return r;
1961         }
1962
1963         return 0;
1964 }
1965
1966 static int bus_remove_object(
1967                 sd_bus *bus,
1968                 bool fallback,
1969                 const char *path,
1970                 sd_message_handler_t callback,
1971                 void *userdata) {
1972
1973         struct object_callback *c;
1974
1975         if (!bus)
1976                 return -EINVAL;
1977         if (!path)
1978                 return -EINVAL;
1979         if (!callback)
1980                 return -EINVAL;
1981
1982         c = hashmap_get(bus->object_callbacks, path);
1983         if (!c)
1984                 return 0;
1985
1986         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
1987                 return 0;
1988
1989         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
1990
1991         free(c->path);
1992         free(c);
1993
1994         return 1;
1995 }
1996
1997 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
1998         return bus_add_object(bus, false, path, callback, userdata);
1999 }
2000
2001 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2002         return bus_remove_object(bus, false, path, callback, userdata);
2003 }
2004
2005 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2006         return bus_add_object(bus, true, prefix, callback, userdata);
2007 }
2008
2009 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2010         return bus_remove_object(bus, true, prefix, callback, userdata);
2011 }