chiark / gitweb /
88f17a26003e672e7d189c6d5c5e8fc627b84726
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "missing.h"
33
34 #include "sd-bus.h"
35 #include "bus-internal.h"
36 #include "bus-message.h"
37 #include "bus-type.h"
38
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42         struct filter_callback *f;
43         struct object_callback *c;
44         unsigned i;
45
46         assert(b);
47
48         if (b->fd >= 0)
49                 close_nointr_nofail(b->fd);
50
51         free(b->rbuffer);
52         free(b->unique_name);
53         free(b->auth_uid);
54         free(b->address);
55
56         for (i = 0; i < b->rqueue_size; i++)
57                 sd_bus_message_unref(b->rqueue[i]);
58         free(b->rqueue);
59
60         for (i = 0; i < b->wqueue_size; i++)
61                 sd_bus_message_unref(b->wqueue[i]);
62         free(b->wqueue);
63
64         hashmap_free_free(b->reply_callbacks);
65         prioq_free(b->reply_callbacks_prioq);
66
67         while ((f = b->filter_callbacks)) {
68                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
69                 free(f);
70         }
71
72         while ((c = hashmap_steal_first(b->object_callbacks))) {
73                 free(c->path);
74                 free(c);
75         }
76
77         hashmap_free(b->object_callbacks);
78
79         free(b);
80 }
81
82 static sd_bus* bus_new(void) {
83         sd_bus *r;
84
85         r = new0(sd_bus, 1);
86         if (!r)
87                 return NULL;
88
89         r->n_ref = 1;
90         r->fd = -1;
91         r->message_version = 1;
92
93         /* We guarantee that wqueue always has space for at least one
94          * entry */
95         r->wqueue = new(sd_bus_message*, 1);
96         if (!r->wqueue) {
97                 free(r);
98                 return NULL;
99         }
100
101         return r;
102 };
103
104 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
105         const char *s;
106         int r;
107
108         assert(bus);
109
110         if (error != 0)
111                 return -error;
112
113         assert(reply);
114
115         r = sd_bus_message_read(reply, "s", &s);
116         if (r < 0)
117                 return r;
118
119         if (!service_name_is_valid(s) || s[0] != ':')
120                 return -EBADMSG;
121
122         bus->unique_name = strdup(s);
123         if (!bus->unique_name)
124                 return -ENOMEM;
125
126         bus->state = BUS_RUNNING;
127
128         return 1;
129 }
130
131 static int bus_send_hello(sd_bus *bus) {
132         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
133         int r;
134
135         assert(bus);
136
137         r = sd_bus_message_new_method_call(
138                         bus,
139                         "org.freedesktop.DBus",
140                         "/",
141                         "org.freedesktop.DBus",
142                         "Hello",
143                         &m);
144         if (r < 0)
145                 return r;
146
147         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
148         if (r < 0)
149                 return r;
150
151         bus->sent_hello = true;
152         return r;
153 }
154
155 static int bus_start_running(sd_bus *bus) {
156         assert(bus);
157
158         if (bus->sent_hello) {
159                 bus->state = BUS_HELLO;
160                 return 1;
161         }
162
163         bus->state = BUS_RUNNING;
164         return 1;
165 }
166
167 static int parse_address_key(const char **p, const char *key, char **value) {
168         size_t l, n = 0;
169         const char *a;
170         char *r = NULL;
171
172         assert(p);
173         assert(*p);
174         assert(key);
175         assert(value);
176
177         l = strlen(key);
178         if (strncmp(*p, key, l) != 0)
179                 return 0;
180
181         if ((*p)[l] != '=')
182                 return 0;
183
184         if (*value)
185                 return -EINVAL;
186
187         a = *p + l + 1;
188         while (*a != ',' && *a != 0) {
189                 char c, *t;
190
191                 if (*a == '%') {
192                         int x, y;
193
194                         x = unhexchar(a[1]);
195                         if (x < 0) {
196                                 free(r);
197                                 return x;
198                         }
199
200                         y = unhexchar(a[2]);
201                         if (y < 0) {
202                                 free(r);
203                                 return y;
204                         }
205
206                         c = (char) ((x << 4) | y);
207                         a += 3;
208                 } else {
209                         c = *a;
210                         a++;
211                 }
212
213                 t = realloc(r, n + 2);
214                 if (!t) {
215                         free(r);
216                         return -ENOMEM;
217                 }
218
219                 r = t;
220                 r[n++] = c;
221         }
222
223         if (!r) {
224                 r = strdup("");
225                 if (!r)
226                         return -ENOMEM;
227         } else
228                 r[n] = 0;
229
230         if (*a == ',')
231                 a++;
232
233         *p = a;
234         *value = r;
235         return 1;
236 }
237
238 static void skip_address_key(const char **p) {
239         assert(p);
240         assert(*p);
241
242         *p += strcspn(*p, ",");
243
244         if (**p == ',')
245                 (*p) ++;
246 }
247
248 static int bus_parse_next_address(sd_bus *b) {
249         const char *a, *p;
250         _cleanup_free_ char *guid = NULL;
251         int r;
252
253         assert(b);
254
255         if (!b->address)
256                 return 0;
257         if (b->address[b->address_index] == 0)
258                 return 0;
259
260         a = b->address + b->address_index;
261
262         zero(b->sockaddr);
263         b->sockaddr_size = 0;
264         b->peer = SD_ID128_NULL;
265
266         if (startswith(a, "unix:")) {
267                 _cleanup_free_ char *path = NULL, *abstract = NULL;
268
269                 p = a + 5;
270                 while (*p != 0) {
271                         r = parse_address_key(&p, "guid", &guid);
272                         if (r < 0)
273                                 return r;
274                         else if (r > 0)
275                                 continue;
276
277                         r = parse_address_key(&p, "path", &path);
278                         if (r < 0)
279                                 return r;
280                         else if (r > 0)
281                                 continue;
282
283                         r = parse_address_key(&p, "abstract", &abstract);
284                         if (r < 0)
285                                 return r;
286                         else if (r > 0)
287                                 continue;
288
289                         skip_address_key(&p);
290                 }
291
292                 if (!path && !abstract)
293                         return -EINVAL;
294
295                 if (path && abstract)
296                         return -EINVAL;
297
298                 if (path) {
299                         size_t l;
300
301                         l = strlen(path);
302                         if (l > sizeof(b->sockaddr.un.sun_path))
303                                 return -E2BIG;
304
305                         b->sockaddr.un.sun_family = AF_UNIX;
306                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
307                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
308                 } else if (abstract) {
309                         size_t l;
310
311                         l = strlen(abstract);
312                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
313                                 return -E2BIG;
314
315                         b->sockaddr.un.sun_family = AF_UNIX;
316                         b->sockaddr.un.sun_path[0] = 0;
317                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
318                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
319                 }
320
321         } else if (startswith(a, "tcp:")) {
322                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
323                 struct addrinfo hints, *result;
324
325                 p = a + 4;
326                 while (*p != 0) {
327                         r = parse_address_key(&p, "guid", &guid);
328                         if (r < 0)
329                                 return r;
330                         else if (r > 0)
331                                 continue;
332
333                         r = parse_address_key(&p, "host", &host);
334                         if (r < 0)
335                                 return r;
336                         else if (r > 0)
337                                 continue;
338
339                         r = parse_address_key(&p, "port", &port);
340                         if (r < 0)
341                                 return r;
342                         else if (r > 0)
343                                 continue;
344
345                         r = parse_address_key(&p, "family", &family);
346                         if (r < 0)
347                                 return r;
348                         else if (r > 0)
349                                 continue;
350
351                         skip_address_key(&p);
352                 }
353
354                 if (!host || !port)
355                         return -EINVAL;
356
357                 zero(hints);
358                 hints.ai_socktype = SOCK_STREAM;
359                 hints.ai_flags = AI_ADDRCONFIG;
360
361                 if (family) {
362                         if (streq(family, "ipv4"))
363                                 hints.ai_family = AF_INET;
364                         else if (streq(family, "ipv6"))
365                                 hints.ai_family = AF_INET6;
366                         else
367                                 return -EINVAL;
368                 }
369
370                 r = getaddrinfo(host, port, &hints, &result);
371                 if (r == EAI_SYSTEM)
372                         return -errno;
373                 else if (r != 0)
374                         return -EADDRNOTAVAIL;
375
376                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
377                 b->sockaddr_size = result->ai_addrlen;
378
379                 freeaddrinfo(result);
380         }
381
382         if (guid) {
383                 r = sd_id128_from_string(guid, &b->peer);
384                 if (r < 0)
385                         return r;
386         }
387
388         b->address_index = p - b->address;
389         return 1;
390 }
391
392 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
393
394         while (size > 0) {
395                 struct iovec *i = iov + *idx;
396
397                 if (i->iov_len > size) {
398                         i->iov_base = (uint8_t*) i->iov_base + size;
399                         i->iov_len -= size;
400                         return;
401                 }
402
403                 size -= i->iov_len;
404
405                 i->iov_base = NULL;
406                 i->iov_len = 0;
407
408                 (*idx) ++;
409         }
410 }
411
412 static int bus_write_auth(sd_bus *b) {
413         struct msghdr mh;
414         ssize_t k;
415
416         assert(b);
417         assert(b->state == BUS_AUTHENTICATING);
418
419         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
420                 return 0;
421
422         if (b->auth_timeout == 0)
423                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
424
425         zero(mh);
426         mh.msg_iov = b->auth_iovec + b->auth_index;
427         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
428
429         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
430         if (k < 0)
431                 return errno == EAGAIN ? 0 : -errno;
432
433         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
434
435         return 1;
436 }
437
438 static int bus_auth_verify(sd_bus *b) {
439         char *e, *f;
440         sd_id128_t peer;
441         unsigned i;
442         int r;
443
444         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
445          * that's it */
446
447         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
448         if (!e)
449                 return 0;
450
451         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
452         if (!f)
453                 return 0;
454
455         if (e - (char*) b->rbuffer != 3 + 32)
456                 return -EPERM;
457
458         if (memcmp(b->rbuffer, "OK ", 3))
459                 return -EPERM;
460
461         for (i = 0; i < 32; i += 2) {
462                 int x, y;
463
464                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
465                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
466
467                 if (x < 0 || y < 0)
468                         return -EINVAL;
469
470                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
471         }
472
473         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
474             !sd_id128_equal(b->peer, peer))
475                 return -EPERM;
476
477         b->peer = peer;
478
479         b->can_fds =
480                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
481                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
482
483         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
484         memmove(b->rbuffer, f + 2, b->rbuffer_size);
485
486         r = bus_start_running(b);
487         if (r < 0)
488                 return r;
489
490         return 1;
491 }
492
493 static int bus_read_auth(sd_bus *b) {
494         struct msghdr mh;
495         struct iovec iov;
496         size_t n;
497         ssize_t k;
498         int r;
499         void *p;
500
501         assert(b);
502
503         r = bus_auth_verify(b);
504         if (r != 0)
505                 return r;
506
507         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
508
509         if (n > BUS_AUTH_SIZE_MAX)
510                 n = BUS_AUTH_SIZE_MAX;
511
512         if (b->rbuffer_size >= n)
513                 return -ENOBUFS;
514
515         p = realloc(b->rbuffer, n);
516         if (!p)
517                 return -ENOMEM;
518
519         b->rbuffer = p;
520
521         zero(iov);
522         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
523         iov.iov_len = n - b->rbuffer_size;
524
525         zero(mh);
526         mh.msg_iov = &iov;
527         mh.msg_iovlen = 1;
528
529         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
530         if (k < 0)
531                 return errno == EAGAIN ? 0 : -errno;
532
533         b->rbuffer_size += k;
534
535         r = bus_auth_verify(b);
536         if (r != 0)
537                 return r;
538
539         return 1;
540 }
541
542 static int bus_setup_fd(sd_bus *b) {
543         int one;
544
545         assert(b);
546
547         /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
548          * just in case. This is actually irrelavant for */
549         one = 1;
550         setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
551         setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
552
553         /* Increase the buffers to a MB */
554         fd_inc_rcvbuf(b->fd, 1024*1024);
555         fd_inc_sndbuf(b->fd, 1024*1024);
556
557         return 0;
558 }
559
560 static int bus_start_auth(sd_bus *b) {
561         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
562         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
563
564         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
565         size_t l;
566
567         assert(b);
568
569         b->state = BUS_AUTHENTICATING;
570
571         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
572         char_array_0(text);
573
574         l = strlen(text);
575         b->auth_uid = hexmem(text, l);
576         if (!b->auth_uid)
577                 return -ENOMEM;
578
579         b->auth_iovec[0].iov_base = (void*) auth_prefix;
580         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
581         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
582         b->auth_iovec[1].iov_len = l * 2;
583         b->auth_iovec[2].iov_base = (void*) auth_suffix;
584         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
585         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
586
587         return bus_write_auth(b);
588 }
589
590 static int bus_start_connect(sd_bus *b) {
591         int r;
592
593         assert(b);
594         assert(b->fd < 0);
595
596         for (;;) {
597                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
598                         r = bus_parse_next_address(b);
599                         if (r < 0)
600                                 return r;
601                         if (r == 0)
602                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
603                 }
604
605                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
606                 if (b->fd < 0) {
607                         b->last_connect_error = errno;
608                         goto try_again;
609                 }
610
611                 r = bus_setup_fd(b);
612                 if (r < 0) {
613                         b->last_connect_error = errno;
614                         goto try_again;
615                 }
616
617                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
618                 if (r < 0) {
619                         if (errno == EINPROGRESS)
620                                 return 1;
621
622                         b->last_connect_error = errno;
623                         goto try_again;
624                 }
625
626                 return bus_start_auth(b);
627
628         try_again:
629                 zero(b->sockaddr);
630
631                 if (b->fd >= 0) {
632                         close_nointr_nofail(b->fd);
633                         b->fd = -1;
634                 }
635         }
636 }
637
638 int sd_bus_open_system(sd_bus **ret) {
639         const char *e;
640         sd_bus *b;
641         int r;
642
643         if (!ret)
644                 return -EINVAL;
645
646         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
647         if (e) {
648                 r = sd_bus_open_address(e, &b);
649                 if (r < 0)
650                         return r;
651         } else {
652                 b = bus_new();
653                 if (!b)
654                         return -ENOMEM;
655
656                 b->sockaddr.un.sun_family = AF_UNIX;
657                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
658                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
659
660                 r = bus_start_connect(b);
661                 if (r < 0) {
662                         bus_free(b);
663                         return r;
664                 }
665         }
666
667         r = bus_send_hello(b);
668         if (r < 0) {
669                 sd_bus_unref(b);
670                 return r;
671         }
672
673         *ret = b;
674         return 0;
675 }
676
677 int sd_bus_open_user(sd_bus **ret) {
678         const char *e;
679         sd_bus *b;
680         size_t l;
681         int r;
682
683         if (!ret)
684                 return -EINVAL;
685
686         e = getenv("DBUS_SESSION_BUS_ADDRESS");
687         if (e) {
688                 r = sd_bus_open_address(e, &b);
689                 if (r < 0)
690                         return r;
691         } else {
692                 e = getenv("XDG_RUNTIME_DIR");
693                 if (!e)
694                         return -ENOENT;
695
696                 l = strlen(e);
697                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
698                         return -E2BIG;
699
700                 b = bus_new();
701                 if (!b)
702                         return -ENOMEM;
703
704                 b->sockaddr.un.sun_family = AF_UNIX;
705                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
706                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
707
708                 r = bus_start_connect(b);
709                 if (r < 0) {
710                         bus_free(b);
711                         return r;
712                 }
713         }
714
715         r = bus_send_hello(b);
716         if (r < 0) {
717                 sd_bus_unref(b);
718                 return r;
719         }
720
721         *ret = b;
722         return 0;
723 }
724
725 int sd_bus_open_address(const char *address, sd_bus **ret) {
726         sd_bus *b;
727         int r;
728
729         if (!address)
730                 return -EINVAL;
731         if (!ret)
732                 return -EINVAL;
733
734         b = bus_new();
735         if (!b)
736                 return -ENOMEM;
737
738         b->address = strdup(address);
739         if (!b->address) {
740                 bus_free(b);
741                 return -ENOMEM;
742         }
743
744         r = bus_start_connect(b);
745         if (r < 0) {
746                 bus_free(b);
747                 return r;
748         }
749
750         *ret = b;
751         return 0;
752 }
753
754 int sd_bus_open_fd(int fd, sd_bus **ret) {
755         sd_bus *b;
756         int r;
757
758         if (fd < 0)
759                 return -EINVAL;
760         if (!ret)
761                 return -EINVAL;
762
763         b = bus_new();
764         if (!b)
765                 return -ENOMEM;
766
767         b->fd = fd;
768
769         r = fd_nonblock(b->fd, true);
770         if (r < 0)
771                 goto fail;
772
773         fd_cloexec(b->fd, true);
774         if (r < 0)
775                 goto fail;
776
777         r = bus_setup_fd(b);
778         if (r < 0)
779                 goto fail;
780
781         r = bus_start_auth(b);
782         if (r < 0)
783                 goto fail;
784
785         *ret = b;
786         return 0;
787
788 fail:
789                 bus_free(b);
790         return r;
791 }
792
793 void sd_bus_close(sd_bus *bus) {
794         if (!bus)
795                 return;
796         if (bus->fd < 0)
797                 return;
798
799         close_nointr_nofail(bus->fd);
800         bus->fd = -1;
801 }
802
803 sd_bus *sd_bus_ref(sd_bus *bus) {
804         if (!bus)
805                 return NULL;
806
807         assert(bus->n_ref > 0);
808
809         bus->n_ref++;
810         return bus;
811 }
812
813 sd_bus *sd_bus_unref(sd_bus *bus) {
814         if (!bus)
815                 return NULL;
816
817         assert(bus->n_ref > 0);
818         bus->n_ref--;
819
820         if (bus->n_ref <= 0)
821                 bus_free(bus);
822
823         return NULL;
824 }
825
826 int sd_bus_is_open(sd_bus *bus) {
827         if (!bus)
828                 return -EINVAL;
829
830         return bus->fd >= 0;
831 }
832
833 int sd_bus_can_send(sd_bus *bus, char type) {
834         int r;
835
836         if (!bus)
837                 return -EINVAL;
838
839         if (type == SD_BUS_TYPE_UNIX_FD) {
840                 r = bus_ensure_running(bus);
841                 if (r < 0)
842                         return r;
843
844                 return bus->can_fds;
845         }
846
847         return bus_type_is_valid(type);
848 }
849
850 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
851         int r;
852
853         if (!bus)
854                 return -EINVAL;
855         if (!peer)
856                 return -EINVAL;
857
858         r = bus_ensure_running(bus);
859         if (r < 0)
860                 return r;
861
862         *peer = bus->peer;
863         return 0;
864 }
865
866 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
867         assert(m);
868
869         if (m->header->version > b->message_version)
870                 return -EPERM;
871
872         if (m->sealed)
873                 return 0;
874
875         return bus_message_seal(m, ++b->serial);
876 }
877
878 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
879         struct msghdr mh;
880         struct iovec *iov;
881         ssize_t k;
882         size_t n;
883         unsigned j;
884
885         assert(bus);
886         assert(m);
887         assert(idx);
888         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
889
890         if (*idx >= m->size)
891                 return 0;
892
893         n = m->n_iovec * sizeof(struct iovec);
894         iov = alloca(n);
895         memcpy(iov, m->iovec, n);
896
897         j = 0;
898         iovec_advance(iov, &j, *idx);
899
900         zero(mh);
901         mh.msg_iov = iov;
902         mh.msg_iovlen = m->n_iovec;
903
904         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
905         if (k < 0)
906                 return errno == EAGAIN ? 0 : -errno;
907
908         *idx += (size_t) k;
909         return 1;
910 }
911
912 static int message_read_need(sd_bus *bus, size_t *need) {
913         uint32_t a, b;
914         uint8_t e;
915         uint64_t sum;
916
917         assert(bus);
918         assert(need);
919         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
920
921         if (bus->rbuffer_size < sizeof(struct bus_header)) {
922                 *need = sizeof(struct bus_header) + 8;
923
924                 /* Minimum message size:
925                  *
926                  * Header +
927                  *
928                  *  Method Call: +2 string headers
929                  *       Signal: +3 string headers
930                  * Method Error: +1 string headers
931                  *               +1 uint32 headers
932                  * Method Reply: +1 uint32 headers
933                  *
934                  * A string header is at least 9 bytes
935                  * A uint32 header is at least 8 bytes
936                  *
937                  * Hence the minimum message size of a valid message
938                  * is header + 8 bytes */
939
940                 return 0;
941         }
942
943         a = ((const uint32_t*) bus->rbuffer)[1];
944         b = ((const uint32_t*) bus->rbuffer)[3];
945
946         e = ((const uint8_t*) bus->rbuffer)[0];
947         if (e == SD_BUS_LITTLE_ENDIAN) {
948                 a = le32toh(a);
949                 b = le32toh(b);
950         } else if (e == SD_BUS_BIG_ENDIAN) {
951                 a = be32toh(a);
952                 b = be32toh(b);
953         } else
954                 return -EBADMSG;
955
956         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
957         if (sum >= BUS_MESSAGE_SIZE_MAX)
958                 return -ENOBUFS;
959
960         *need = (size_t) sum;
961         return 0;
962 }
963
964 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
965         sd_bus_message *t;
966         void *b = NULL;
967         int r;
968
969         assert(bus);
970         assert(m);
971         assert(bus->rbuffer_size >= size);
972         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
973
974         if (bus->rbuffer_size > size) {
975                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
976                 if (!b) {
977                         free(t);
978                         return -ENOMEM;
979                 }
980         }
981
982         r = bus_message_from_malloc(bus->rbuffer, size,
983                                     bus->ucred_valid ? &bus->ucred : NULL,
984                                     bus->label[0] ? bus->label : NULL, &t);
985         if (r < 0) {
986                 free(b);
987                 return r;
988         }
989
990         bus->rbuffer = b;
991         bus->rbuffer_size -= size;
992
993         *m = t;
994         return 1;
995 }
996
997 static int message_read(sd_bus *bus, sd_bus_message **m) {
998         struct msghdr mh;
999         struct iovec iov;
1000         ssize_t k;
1001         size_t need;
1002         int r;
1003         void *b;
1004         union {
1005                 struct cmsghdr cmsghdr;
1006                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1007                             CMSG_SPACE(NAME_MAX)]; /*selinux label */
1008         } control;
1009         struct cmsghdr *cmsg;
1010
1011         assert(bus);
1012         assert(m);
1013         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1014
1015         r = message_read_need(bus, &need);
1016         if (r < 0)
1017                 return r;
1018
1019         if (bus->rbuffer_size >= need)
1020                 return message_make(bus, need, m);
1021
1022         b = realloc(bus->rbuffer, need);
1023         if (!b)
1024                 return -ENOMEM;
1025
1026         bus->rbuffer = b;
1027
1028         zero(iov);
1029         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1030         iov.iov_len = need - bus->rbuffer_size;
1031
1032         zero(mh);
1033         mh.msg_iov = &iov;
1034         mh.msg_iovlen = 1;
1035         mh.msg_control = &control;
1036         mh.msg_controllen = sizeof(control);
1037
1038         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1039         if (k < 0)
1040                 return errno == EAGAIN ? 0 : -errno;
1041
1042         bus->rbuffer_size += k;
1043         bus->ucred_valid = false;
1044         bus->label[0] = 0;
1045
1046         for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1047                 if (cmsg->cmsg_level == SOL_SOCKET &&
1048                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1049                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1050
1051                         memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1052                         bus->ucred_valid = true;
1053
1054                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1055                          cmsg->cmsg_type == SCM_SECURITY) {
1056
1057                         size_t l;
1058                         l = cmsg->cmsg_len - CMSG_LEN(0);
1059                         memcpy(&bus->label, CMSG_DATA(cmsg), l);
1060                         bus->label[l] = 0;
1061                 }
1062         }
1063
1064         r = message_read_need(bus, &need);
1065         if (r < 0)
1066                 return r;
1067
1068         if (bus->rbuffer_size >= need)
1069                 return message_make(bus, need, m);
1070
1071         return 1;
1072 }
1073
1074 static int dispatch_wqueue(sd_bus *bus) {
1075         int r, ret = 0;
1076
1077         assert(bus);
1078         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1079
1080         if (bus->fd < 0)
1081                 return -ENOTCONN;
1082
1083         while (bus->wqueue_size > 0) {
1084
1085                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1086                 if (r < 0) {
1087                         sd_bus_close(bus);
1088                         return r;
1089                 } else if (r == 0)
1090                         /* Didn't do anything this time */
1091                         return ret;
1092                 else if (bus->windex >= bus->wqueue[0]->size) {
1093                         /* Fully written. Let's drop the entry from
1094                          * the queue.
1095                          *
1096                          * This isn't particularly optimized, but
1097                          * well, this is supposed to be our worst-case
1098                          * buffer only, and the socket buffer is
1099                          * supposed to be our primary buffer, and if
1100                          * it got full, then all bets are off
1101                          * anyway. */
1102
1103                         sd_bus_message_unref(bus->wqueue[0]);
1104                         bus->wqueue_size --;
1105                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1106                         bus->windex = 0;
1107
1108                         ret = 1;
1109                 }
1110         }
1111
1112         return ret;
1113 }
1114
1115 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1116         sd_bus_message *z = NULL;
1117         int r, ret = 0;
1118
1119         assert(bus);
1120         assert(m);
1121         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1122
1123         if (bus->fd < 0)
1124                 return -ENOTCONN;
1125
1126         if (bus->rqueue_size > 0) {
1127                 /* Dispatch a queued message */
1128
1129                 *m = bus->rqueue[0];
1130                 bus->rqueue_size --;
1131                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1132                 return 1;
1133         }
1134
1135         /* Try to read a new message */
1136         do {
1137                 r = message_read(bus, &z);
1138                 if (r < 0) {
1139                         sd_bus_close(bus);
1140                         return r;
1141                 }
1142                 if (r == 0)
1143                         return ret;
1144
1145                 r = 1;
1146         } while (!z);
1147
1148         *m = z;
1149         return 1;
1150 }
1151
1152 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1153         int r;
1154
1155         if (!bus)
1156                 return -EINVAL;
1157         if (bus->fd < 0)
1158                 return -ENOTCONN;
1159         if (!m)
1160                 return -EINVAL;
1161
1162         /* If the serial number isn't kept, then we know that no reply
1163          * is expected */
1164         if (!serial && !m->sealed)
1165                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1166
1167         r = bus_seal_message(bus, m);
1168         if (r < 0)
1169                 return r;
1170
1171         /* If this is a reply and no reply was requested, then let's
1172          * suppress this, if we can */
1173         if (m->dont_send && !serial)
1174                 return 0;
1175
1176         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1177                 size_t idx = 0;
1178
1179                 r = message_write(bus, m, &idx);
1180                 if (r < 0) {
1181                         sd_bus_close(bus);
1182                         return r;
1183                 } else if (idx < m->size)  {
1184                         /* Wasn't fully written. So let's remember how
1185                          * much was written. Note that the first entry
1186                          * of the wqueue array is always allocated so
1187                          * that we always can remember how much was
1188                          * written. */
1189                         bus->wqueue[0] = sd_bus_message_ref(m);
1190                         bus->wqueue_size = 1;
1191                         bus->windex = idx;
1192                 }
1193         } else {
1194                 sd_bus_message **q;
1195
1196                 /* Just append it to the queue. */
1197
1198                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1199                         return -ENOBUFS;
1200
1201                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1202                 if (!q)
1203                         return -ENOMEM;
1204
1205                 bus->wqueue = q;
1206                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1207         }
1208
1209         if (serial)
1210                 *serial = BUS_MESSAGE_SERIAL(m);
1211
1212         return 0;
1213 }
1214
1215 static usec_t calc_elapse(uint64_t usec) {
1216         if (usec == (uint64_t) -1)
1217                 return 0;
1218
1219         if (usec == 0)
1220                 usec = BUS_DEFAULT_TIMEOUT;
1221
1222         return now(CLOCK_MONOTONIC) + usec;
1223 }
1224
1225 static int timeout_compare(const void *a, const void *b) {
1226         const struct reply_callback *x = a, *y = b;
1227
1228         if (x->timeout != 0 && y->timeout == 0)
1229                 return -1;
1230
1231         if (x->timeout == 0 && y->timeout != 0)
1232                 return 1;
1233
1234         if (x->timeout < y->timeout)
1235                 return -1;
1236
1237         if (x->timeout > y->timeout)
1238                 return 1;
1239
1240         return 0;
1241 }
1242
1243 int sd_bus_send_with_reply(
1244                 sd_bus *bus,
1245                 sd_bus_message *m,
1246                 sd_message_handler_t callback,
1247                 void *userdata,
1248                 uint64_t usec,
1249                 uint64_t *serial) {
1250
1251         struct reply_callback *c;
1252         int r;
1253
1254         if (!bus)
1255                 return -EINVAL;
1256         if (bus->fd < 0)
1257                 return -ENOTCONN;
1258         if (!m)
1259                 return -EINVAL;
1260         if (!callback)
1261                 return -EINVAL;
1262         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1263                 return -EINVAL;
1264         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1265                 return -EINVAL;
1266
1267         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1268         if (r < 0)
1269                 return r;
1270
1271         if (usec != (uint64_t) -1) {
1272                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1273                 if (r < 0)
1274                         return r;
1275         }
1276
1277         r = bus_seal_message(bus, m);
1278         if (r < 0)
1279                 return r;
1280
1281         c = new(struct reply_callback, 1);
1282         if (!c)
1283                 return -ENOMEM;
1284
1285         c->callback = callback;
1286         c->userdata = userdata;
1287         c->serial = BUS_MESSAGE_SERIAL(m);
1288         c->timeout = calc_elapse(usec);
1289
1290         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1291         if (r < 0) {
1292                 free(c);
1293                 return r;
1294         }
1295
1296         if (c->timeout != 0) {
1297                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1298                 if (r < 0) {
1299                         c->timeout = 0;
1300                         sd_bus_send_with_reply_cancel(bus, c->serial);
1301                         return r;
1302                 }
1303         }
1304
1305         r = sd_bus_send(bus, m, serial);
1306         if (r < 0) {
1307                 sd_bus_send_with_reply_cancel(bus, c->serial);
1308                 return r;
1309         }
1310
1311         return r;
1312 }
1313
1314 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1315         struct reply_callback *c;
1316
1317         if (!bus)
1318                 return -EINVAL;
1319         if (serial == 0)
1320                 return -EINVAL;
1321
1322         c = hashmap_remove(bus->reply_callbacks, &serial);
1323         if (!c)
1324                 return 0;
1325
1326         if (c->timeout != 0)
1327                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1328
1329         free(c);
1330         return 1;
1331 }
1332
1333 int bus_ensure_running(sd_bus *bus) {
1334         int r;
1335
1336         assert(bus);
1337
1338         if (bus->state == BUS_RUNNING)
1339                 return 1;
1340
1341         for (;;) {
1342                 r = sd_bus_process(bus, NULL);
1343                 if (r < 0)
1344                         return r;
1345                 if (bus->state == BUS_RUNNING)
1346                         return 1;
1347                 if (r > 0)
1348                         continue;
1349
1350                 r = sd_bus_wait(bus, (uint64_t) -1);
1351                 if (r < 0)
1352                         return r;
1353         }
1354 }
1355
1356 int sd_bus_send_with_reply_and_block(
1357                 sd_bus *bus,
1358                 sd_bus_message *m,
1359                 uint64_t usec,
1360                 sd_bus_error *error,
1361                 sd_bus_message **reply) {
1362
1363         int r;
1364         usec_t timeout;
1365         uint64_t serial;
1366         bool room = false;
1367
1368         if (!bus)
1369                 return -EINVAL;
1370         if (bus->fd < 0)
1371                 return -ENOTCONN;
1372         if (!m)
1373                 return -EINVAL;
1374         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1375                 return -EINVAL;
1376         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1377                 return -EINVAL;
1378         if (bus_error_is_dirty(error))
1379                 return -EINVAL;
1380
1381         r = bus_ensure_running(bus);
1382         if (r < 0)
1383                 return r;
1384
1385         r = sd_bus_send(bus, m, &serial);
1386         if (r < 0)
1387                 return r;
1388
1389         timeout = calc_elapse(usec);
1390
1391         for (;;) {
1392                 usec_t left;
1393                 sd_bus_message *incoming = NULL;
1394
1395                 if (!room) {
1396                         sd_bus_message **q;
1397
1398                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1399                                 return -ENOBUFS;
1400
1401                         /* Make sure there's room for queuing this
1402                          * locally, before we read the message */
1403
1404                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1405                         if (!q)
1406                                 return -ENOMEM;
1407
1408                         bus->rqueue = q;
1409                         room = true;
1410                 }
1411
1412                 r = message_read(bus, &incoming);
1413                 if (r < 0)
1414                         return r;
1415                 if (incoming) {
1416
1417                         if (incoming->reply_serial == serial) {
1418                                 /* Found a match! */
1419
1420                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1421                                         *reply = incoming;
1422                                         return 0;
1423                                 }
1424
1425                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1426                                         int k;
1427
1428                                         r = sd_bus_error_copy(error, &incoming->error);
1429                                         if (r < 0) {
1430                                                 sd_bus_message_unref(incoming);
1431                                                 return r;
1432                                         }
1433
1434                                         k = bus_error_to_errno(&incoming->error);
1435                                         sd_bus_message_unref(incoming);
1436                                         return k;
1437                                 }
1438
1439                                 sd_bus_message_unref(incoming);
1440                                 return -EIO;
1441                         }
1442
1443                         /* There's already guaranteed to be room for
1444                          * this, so need to resize things here */
1445                         bus->rqueue[bus->rqueue_size ++] = incoming;
1446                         room = false;
1447
1448                         /* Try to read more, right-away */
1449                         continue;
1450                 }
1451                 if (r != 0)
1452                         continue;
1453
1454                 if (timeout > 0) {
1455                         usec_t n;
1456
1457                         n = now(CLOCK_MONOTONIC);
1458                         if (n >= timeout)
1459                                 return -ETIMEDOUT;
1460
1461                         left = timeout - n;
1462                 } else
1463                         left = (uint64_t) -1;
1464
1465                 r = bus_poll(bus, true, left);
1466                 if (r < 0)
1467                         return r;
1468
1469                 r = dispatch_wqueue(bus);
1470                 if (r < 0)
1471                         return r;
1472         }
1473 }
1474
1475 int sd_bus_get_fd(sd_bus *bus) {
1476         if (!bus)
1477                 return -EINVAL;
1478
1479         if (bus->fd < 0)
1480                 return -ENOTCONN;
1481
1482         return bus->fd;
1483 }
1484
1485 int sd_bus_get_events(sd_bus *bus) {
1486         int flags = 0;
1487
1488         if (!bus)
1489                 return -EINVAL;
1490         if (bus->fd < 0)
1491                 return -ENOTCONN;
1492
1493         if (bus->state == BUS_OPENING)
1494                 flags |= POLLOUT;
1495         else if (bus->state == BUS_AUTHENTICATING) {
1496
1497                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1498                         flags |= POLLOUT;
1499
1500                 flags |= POLLIN;
1501
1502         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1503                 if (bus->rqueue_size <= 0)
1504                         flags |= POLLIN;
1505                 if (bus->wqueue_size > 0)
1506                         flags |= POLLOUT;
1507         }
1508
1509         return flags;
1510 }
1511
1512 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1513         struct reply_callback *c;
1514
1515         if (!bus)
1516                 return -EINVAL;
1517         if (!timeout_usec)
1518                 return -EINVAL;
1519         if (bus->fd < 0)
1520                 return -ENOTCONN;
1521
1522         if (bus->state == BUS_AUTHENTICATING) {
1523                 *timeout_usec = bus->auth_timeout;
1524                 return 1;
1525         }
1526
1527         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1528                 return 0;
1529
1530         c = prioq_peek(bus->reply_callbacks_prioq);
1531         if (!c)
1532                 return 0;
1533
1534         *timeout_usec = c->timeout;
1535         return 1;
1536 }
1537
1538 static int process_timeout(sd_bus *bus) {
1539         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1540         struct reply_callback *c;
1541         usec_t n;
1542         int r;
1543
1544         assert(bus);
1545
1546         c = prioq_peek(bus->reply_callbacks_prioq);
1547         if (!c)
1548                 return 0;
1549
1550         n = now(CLOCK_MONOTONIC);
1551         if (c->timeout > n)
1552                 return 0;
1553
1554         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1555         hashmap_remove(bus->reply_callbacks, &c->serial);
1556
1557         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1558         free(c);
1559
1560         return r < 0 ? r : 1;
1561 }
1562
1563 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1564         struct reply_callback *c;
1565         int r;
1566
1567         assert(bus);
1568         assert(m);
1569
1570         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1571             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1572                 return 0;
1573
1574         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1575         if (!c)
1576                 return 0;
1577
1578         if (c->timeout != 0)
1579                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1580
1581         r = c->callback(bus, 0, m, c->userdata);
1582         free(c);
1583
1584         return r;
1585 }
1586
1587 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1588         struct filter_callback *l;
1589         int r;
1590
1591         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1592                 r = l->callback(bus, 0, m, l->userdata);
1593                 if (r != 0)
1594                         return r;
1595         }
1596
1597         return 0;
1598 }
1599
1600 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1601         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1602         int r;
1603
1604         assert(bus);
1605         assert(m);
1606
1607         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1608                 return 0;
1609
1610         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1611                 return 0;
1612
1613         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1614                 return 1;
1615
1616         if (streq_ptr(m->member, "Ping"))
1617                 r = sd_bus_message_new_method_return(bus, m, &reply);
1618         else if (streq_ptr(m->member, "GetMachineId")) {
1619                 sd_id128_t id;
1620                 char sid[33];
1621
1622                 r = sd_id128_get_machine(&id);
1623                 if (r < 0)
1624                         return r;
1625
1626                 r = sd_bus_message_new_method_return(bus, m, &reply);
1627                 if (r < 0)
1628                         return r;
1629
1630                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1631         } else {
1632                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1633
1634                 sd_bus_error_set(&error,
1635                                  "org.freedesktop.DBus.Error.UnknownMethod",
1636                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1637
1638                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1639         }
1640
1641         if (r < 0)
1642                 return r;
1643
1644         r = sd_bus_send(bus, reply, NULL);
1645         if (r < 0)
1646                 return r;
1647
1648         return 1;
1649 }
1650
1651 static int process_object(sd_bus *bus, sd_bus_message *m) {
1652         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1653         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1654         struct object_callback *c;
1655         char *p;
1656         int r;
1657         bool found = false;
1658
1659         assert(bus);
1660         assert(m);
1661
1662         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1663                 return 0;
1664
1665         if (hashmap_isempty(bus->object_callbacks))
1666                 return 0;
1667
1668         c = hashmap_get(bus->object_callbacks, m->path);
1669         if (c) {
1670                 r = c->callback(bus, 0, m, c->userdata);
1671                 if (r != 0)
1672                         return r;
1673
1674                 found = true;
1675         }
1676
1677         /* Look for fallback prefixes */
1678         p = strdupa(m->path);
1679         for (;;) {
1680                 char *e;
1681
1682                 e = strrchr(p, '/');
1683                 if (e == p || !e)
1684                         break;
1685
1686                 *e = 0;
1687
1688                 c = hashmap_get(bus->object_callbacks, p);
1689                 if (c && c->is_fallback) {
1690                         r = c->callback(bus, 0, m, c->userdata);
1691                         if (r != 0)
1692                                 return r;
1693
1694                         found = true;
1695                 }
1696         }
1697
1698         if (!found)
1699                 return 0;
1700
1701         sd_bus_error_set(&error,
1702                          "org.freedesktop.DBus.Error.UnknownMethod",
1703                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1704
1705         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1706         if (r < 0)
1707                 return r;
1708
1709         r = sd_bus_send(bus, reply, NULL);
1710         if (r < 0)
1711                 return r;
1712
1713         return 1;
1714 }
1715
1716 static int process_message(sd_bus *bus, sd_bus_message *m) {
1717         int r;
1718
1719         assert(bus);
1720         assert(m);
1721
1722         r = process_reply(bus, m);
1723         if (r != 0)
1724                 return r;
1725
1726         r = process_filter(bus, m);
1727         if (r != 0)
1728                 return r;
1729
1730         r = process_builtin(bus, m);
1731         if (r != 0)
1732                 return r;
1733
1734         return process_object(bus, m);
1735 }
1736
1737 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1738         int r;
1739
1740         /* Returns 0 when we didn't do anything. This should cause the
1741          * caller to invoke sd_bus_wait() before returning the next
1742          * time. Returns > 0 when we did something, which possibly
1743          * means *ret is filled in with an unprocessed message. */
1744
1745         if (!bus)
1746                 return -EINVAL;
1747         if (bus->fd < 0)
1748                 return -ENOTCONN;
1749
1750         if (bus->state == BUS_OPENING) {
1751                 struct pollfd p;
1752
1753                 zero(p);
1754                 p.fd = bus->fd;
1755                 p.events = POLLOUT;
1756
1757                 r = poll(&p, 1, 0);
1758                 if (r < 0)
1759                         return -errno;
1760
1761                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1762                         int error = 0;
1763                         socklen_t slen = sizeof(error);
1764
1765                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1766                         if (r < 0)
1767                                 bus->last_connect_error = errno;
1768                         else if (error != 0)
1769                                 bus->last_connect_error = error;
1770                         else if (p.revents & (POLLERR|POLLHUP))
1771                                 bus->last_connect_error = ECONNREFUSED;
1772                         else {
1773                                 r = bus_start_auth(bus);
1774                                 goto null_message;
1775                         }
1776
1777                         /* Try next address */
1778                         r = bus_start_connect(bus);
1779                         goto null_message;
1780                 }
1781
1782                 r = 0;
1783                 goto null_message;
1784
1785         } else if (bus->state == BUS_AUTHENTICATING) {
1786
1787                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1788                         return -ETIMEDOUT;
1789
1790                 r = bus_write_auth(bus);
1791                 if (r != 0)
1792                         goto null_message;
1793
1794                 r = bus_read_auth(bus);
1795                 goto null_message;
1796
1797         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1798                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1799                 int k;
1800
1801                 r = process_timeout(bus);
1802                 if (r != 0)
1803                         goto null_message;
1804
1805                 r = dispatch_wqueue(bus);
1806                 if (r != 0)
1807                         goto null_message;
1808
1809                 k = r;
1810                 r = dispatch_rqueue(bus, &m);
1811                 if (r < 0)
1812                         return r;
1813                 if (!m) {
1814                         if (r == 0)
1815                                 r = k;
1816                         goto null_message;
1817                 }
1818
1819                 r = process_message(bus, m);
1820                 if (r != 0)
1821                         goto null_message;
1822
1823                 if (ret) {
1824                         *ret = m;
1825                         m = NULL;
1826                         return 1;
1827                 }
1828
1829                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1830                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1831                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1832
1833                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1834
1835                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1836                         if (r < 0)
1837                                 return r;
1838
1839                         r = sd_bus_send(bus, reply, NULL);
1840                         if (r < 0)
1841                                 return r;
1842                 }
1843
1844                 return 1;
1845         }
1846
1847         assert_not_reached("Unknown state");
1848
1849 null_message:
1850         if (r >= 0 && ret)
1851                 *ret = NULL;
1852
1853         return r;
1854 }
1855
1856 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1857         struct pollfd p;
1858         int r, e;
1859         struct timespec ts;
1860         usec_t until, m;
1861
1862         assert(bus);
1863
1864         if (bus->fd < 0)
1865                 return -ENOTCONN;
1866
1867         e = sd_bus_get_events(bus);
1868         if (e < 0)
1869                 return e;
1870
1871         if (need_more)
1872                 e |= POLLIN;
1873
1874         r = sd_bus_get_timeout(bus, &until);
1875         if (r < 0)
1876                 return r;
1877         if (r == 0)
1878                 m = (uint64_t) -1;
1879         else {
1880                 usec_t n;
1881                 n = now(CLOCK_MONOTONIC);
1882                 m = until > n ? until - n : 0;
1883         }
1884
1885         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1886                 m = timeout_usec;
1887
1888         zero(p);
1889         p.fd = bus->fd;
1890         p.events = e;
1891
1892         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1893         if (r < 0)
1894                 return -errno;
1895
1896         return r > 0 ? 1 : 0;
1897 }
1898
1899 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1900
1901         if (!bus)
1902                 return -EINVAL;
1903         if (bus->fd < 0)
1904                 return -ENOTCONN;
1905         if (bus->rqueue_size > 0)
1906                 return 0;
1907
1908         return bus_poll(bus, false, timeout_usec);
1909 }
1910
1911 int sd_bus_flush(sd_bus *bus) {
1912         int r;
1913
1914         if (!bus)
1915                 return -EINVAL;
1916         if (bus->fd < 0)
1917                 return -ENOTCONN;
1918
1919         r = bus_ensure_running(bus);
1920         if (r < 0)
1921                 return r;
1922
1923         if (bus->wqueue_size <= 0)
1924                 return 0;
1925
1926         for (;;) {
1927                 r = dispatch_wqueue(bus);
1928                 if (r < 0)
1929                         return r;
1930
1931                 if (bus->wqueue_size <= 0)
1932                         return 0;
1933
1934                 r = bus_poll(bus, false, (uint64_t) -1);
1935                 if (r < 0)
1936                         return r;
1937         }
1938 }
1939
1940 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1941         struct filter_callback *f;
1942
1943         if (!bus)
1944                 return -EINVAL;
1945         if (!callback)
1946                 return -EINVAL;
1947
1948         f = new(struct filter_callback, 1);
1949         if (!f)
1950                 return -ENOMEM;
1951         f->callback = callback;
1952         f->userdata = userdata;
1953
1954         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1955         return 0;
1956 }
1957
1958 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1959         struct filter_callback *f;
1960
1961         if (!bus)
1962                 return -EINVAL;
1963         if (!callback)
1964                 return -EINVAL;
1965
1966         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1967                 if (f->callback == callback && f->userdata == userdata) {
1968                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1969                         free(f);
1970                         return 1;
1971                 }
1972         }
1973
1974         return 0;
1975 }
1976
1977 static int bus_add_object(
1978                 sd_bus *bus,
1979                 bool fallback,
1980                 const char *path,
1981                 sd_message_handler_t callback,
1982                 void *userdata) {
1983
1984         struct object_callback *c;
1985         int r;
1986
1987         if (!bus)
1988                 return -EINVAL;
1989         if (!path)
1990                 return -EINVAL;
1991         if (!callback)
1992                 return -EINVAL;
1993
1994         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1995         if (r < 0)
1996                 return r;
1997
1998         c = new(struct object_callback, 1);
1999         if (!c)
2000                 return -ENOMEM;
2001
2002         c->path = strdup(path);
2003         if (!path) {
2004                 free(c);
2005                 return -ENOMEM;
2006         }
2007
2008         c->callback = callback;
2009         c->userdata = userdata;
2010         c->is_fallback = fallback;
2011
2012         r = hashmap_put(bus->object_callbacks, c->path, c);
2013         if (r < 0) {
2014                 free(c->path);
2015                 free(c);
2016                 return r;
2017         }
2018
2019         return 0;
2020 }
2021
2022 static int bus_remove_object(
2023                 sd_bus *bus,
2024                 bool fallback,
2025                 const char *path,
2026                 sd_message_handler_t callback,
2027                 void *userdata) {
2028
2029         struct object_callback *c;
2030
2031         if (!bus)
2032                 return -EINVAL;
2033         if (!path)
2034                 return -EINVAL;
2035         if (!callback)
2036                 return -EINVAL;
2037
2038         c = hashmap_get(bus->object_callbacks, path);
2039         if (!c)
2040                 return 0;
2041
2042         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2043                 return 0;
2044
2045         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2046
2047         free(c->path);
2048         free(c);
2049
2050         return 1;
2051 }
2052
2053 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2054         return bus_add_object(bus, false, path, callback, userdata);
2055 }
2056
2057 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2058         return bus_remove_object(bus, false, path, callback, userdata);
2059 }
2060
2061 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2062         return bus_add_object(bus, true, prefix, callback, userdata);
2063 }
2064
2065 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2066         return bus_remove_object(bus, true, prefix, callback, userdata);
2067 }