chiark / gitweb /
service: no need to drop rc. prefix anymore
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "missing.h"
33
34 #include "sd-bus.h"
35 #include "bus-internal.h"
36 #include "bus-message.h"
37 #include "bus-type.h"
38
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42         struct filter_callback *f;
43         struct object_callback *c;
44         unsigned i;
45
46         assert(b);
47
48         if (b->fd >= 0)
49                 close_nointr_nofail(b->fd);
50
51         free(b->rbuffer);
52         free(b->unique_name);
53         free(b->auth_uid);
54         free(b->address);
55
56         for (i = 0; i < b->rqueue_size; i++)
57                 sd_bus_message_unref(b->rqueue[i]);
58         free(b->rqueue);
59
60         for (i = 0; i < b->wqueue_size; i++)
61                 sd_bus_message_unref(b->wqueue[i]);
62         free(b->wqueue);
63
64         hashmap_free_free(b->reply_callbacks);
65         prioq_free(b->reply_callbacks_prioq);
66
67         while ((f = b->filter_callbacks)) {
68                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
69                 free(f);
70         }
71
72         while ((c = hashmap_steal_first(b->object_callbacks))) {
73                 free(c->path);
74                 free(c);
75         }
76
77         hashmap_free(b->object_callbacks);
78
79         free(b);
80 }
81
82 static sd_bus* bus_new(void) {
83         sd_bus *r;
84
85         r = new0(sd_bus, 1);
86         if (!r)
87                 return NULL;
88
89         r->n_ref = 1;
90         r->fd = -1;
91         r->message_version = 1;
92
93         /* We guarantee that wqueue always has space for at least one
94          * entry */
95         r->wqueue = new(sd_bus_message*, 1);
96         if (!r->wqueue) {
97                 free(r);
98                 return NULL;
99         }
100
101         return r;
102 };
103
104 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
105         const char *s;
106         int r;
107
108         assert(bus);
109
110         if (error != 0)
111                 return -error;
112
113         assert(reply);
114
115         r = sd_bus_message_read(reply, "s", &s);
116         if (r < 0)
117                 return r;
118
119         if (!service_name_is_valid(s) || s[0] != ':')
120                 return -EBADMSG;
121
122         bus->unique_name = strdup(s);
123         if (!bus->unique_name)
124                 return -ENOMEM;
125
126         bus->state = BUS_RUNNING;
127
128         return 1;
129 }
130
131 static int bus_send_hello(sd_bus *bus) {
132         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
133         int r;
134
135         assert(bus);
136
137         r = sd_bus_message_new_method_call(
138                         bus,
139                         "org.freedesktop.DBus",
140                         "/",
141                         "org.freedesktop.DBus",
142                         "Hello",
143                         &m);
144         if (r < 0)
145                 return r;
146
147         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
148         if (r < 0)
149                 return r;
150
151         bus->sent_hello = true;
152         return r;
153 }
154
155 static int bus_start_running(sd_bus *bus) {
156         assert(bus);
157
158         if (bus->sent_hello) {
159                 bus->state = BUS_HELLO;
160                 return 1;
161         }
162
163         bus->state = BUS_RUNNING;
164         return 1;
165 }
166
167 static int parse_address_key(const char **p, const char *key, char **value) {
168         size_t l, n = 0;
169         const char *a;
170         char *r = NULL;
171
172         assert(p);
173         assert(*p);
174         assert(key);
175         assert(value);
176
177         l = strlen(key);
178         if (strncmp(*p, key, l) != 0)
179                 return 0;
180
181         if ((*p)[l] != '=')
182                 return 0;
183
184         if (*value)
185                 return -EINVAL;
186
187         a = *p + l + 1;
188         while (*a != ',' && *a != 0) {
189                 char c, *t;
190
191                 if (*a == '%') {
192                         int x, y;
193
194                         x = unhexchar(a[1]);
195                         if (x < 0) {
196                                 free(r);
197                                 return x;
198                         }
199
200                         y = unhexchar(a[2]);
201                         if (y < 0) {
202                                 free(r);
203                                 return y;
204                         }
205
206                         c = (char) ((x << 4) | y);
207                         a += 3;
208                 } else {
209                         c = *a;
210                         a++;
211                 }
212
213                 t = realloc(r, n + 2);
214                 if (!t) {
215                         free(r);
216                         return -ENOMEM;
217                 }
218
219                 r = t;
220                 r[n++] = c;
221         }
222
223         if (!r) {
224                 r = strdup("");
225                 if (!r)
226                         return -ENOMEM;
227         } else
228                 r[n] = 0;
229
230         if (*a == ',')
231                 a++;
232
233         *p = a;
234         *value = r;
235         return 1;
236 }
237
238 static void skip_address_key(const char **p) {
239         assert(p);
240         assert(*p);
241
242         *p += strcspn(*p, ",");
243
244         if (**p == ',')
245                 (*p) ++;
246 }
247
248 static int bus_parse_next_address(sd_bus *b) {
249         const char *a, *p;
250         _cleanup_free_ char *guid = NULL;
251         int r;
252
253         assert(b);
254
255         if (!b->address)
256                 return 0;
257         if (b->address[b->address_index] == 0)
258                 return 0;
259
260         a = b->address + b->address_index;
261
262         zero(b->sockaddr);
263         b->sockaddr_size = 0;
264         b->peer = SD_ID128_NULL;
265
266         if (startswith(a, "unix:")) {
267                 _cleanup_free_ char *path = NULL, *abstract = NULL;
268
269                 p = a + 5;
270                 while (*p != 0) {
271                         r = parse_address_key(&p, "guid", &guid);
272                         if (r < 0)
273                                 return r;
274                         else if (r > 0)
275                                 continue;
276
277                         r = parse_address_key(&p, "path", &path);
278                         if (r < 0)
279                                 return r;
280                         else if (r > 0)
281                                 continue;
282
283                         r = parse_address_key(&p, "abstract", &abstract);
284                         if (r < 0)
285                                 return r;
286                         else if (r > 0)
287                                 continue;
288
289                         skip_address_key(&p);
290                 }
291
292                 if (!path && !abstract)
293                         return -EINVAL;
294
295                 if (path && abstract)
296                         return -EINVAL;
297
298                 if (path) {
299                         size_t l;
300
301                         l = strlen(path);
302                         if (l > sizeof(b->sockaddr.un.sun_path))
303                                 return -E2BIG;
304
305                         b->sockaddr.un.sun_family = AF_UNIX;
306                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
307                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
308                 } else if (abstract) {
309                         size_t l;
310
311                         l = strlen(abstract);
312                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
313                                 return -E2BIG;
314
315                         b->sockaddr.un.sun_family = AF_UNIX;
316                         b->sockaddr.un.sun_path[0] = 0;
317                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
318                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
319                 }
320
321         } else if (startswith(a, "tcp:")) {
322                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
323                 struct addrinfo hints, *result;
324
325                 p = a + 4;
326                 while (*p != 0) {
327                         r = parse_address_key(&p, "guid", &guid);
328                         if (r < 0)
329                                 return r;
330                         else if (r > 0)
331                                 continue;
332
333                         r = parse_address_key(&p, "host", &host);
334                         if (r < 0)
335                                 return r;
336                         else if (r > 0)
337                                 continue;
338
339                         r = parse_address_key(&p, "port", &port);
340                         if (r < 0)
341                                 return r;
342                         else if (r > 0)
343                                 continue;
344
345                         r = parse_address_key(&p, "family", &family);
346                         if (r < 0)
347                                 return r;
348                         else if (r > 0)
349                                 continue;
350
351                         skip_address_key(&p);
352                 }
353
354                 if (!host || !port)
355                         return -EINVAL;
356
357                 zero(hints);
358                 hints.ai_socktype = SOCK_STREAM;
359                 hints.ai_flags = AI_ADDRCONFIG;
360
361                 if (family) {
362                         if (streq(family, "ipv4"))
363                                 hints.ai_family = AF_INET;
364                         else if (streq(family, "ipv6"))
365                                 hints.ai_family = AF_INET6;
366                         else
367                                 return -EINVAL;
368                 }
369
370                 r = getaddrinfo(host, port, &hints, &result);
371                 if (r == EAI_SYSTEM)
372                         return -errno;
373                 else if (r != 0)
374                         return -EADDRNOTAVAIL;
375
376                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
377                 b->sockaddr_size = result->ai_addrlen;
378
379                 freeaddrinfo(result);
380         }
381
382         if (guid) {
383                 r = sd_id128_from_string(guid, &b->peer);
384                 if (r < 0)
385                         return r;
386         }
387
388         b->address_index = p - b->address;
389         return 1;
390 }
391
392 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
393
394         while (size > 0) {
395                 struct iovec *i = iov + *idx;
396
397                 if (i->iov_len > size) {
398                         i->iov_base = (uint8_t*) i->iov_base + size;
399                         i->iov_len -= size;
400                         return;
401                 }
402
403                 size -= i->iov_len;
404
405                 i->iov_base = NULL;
406                 i->iov_len = 0;
407
408                 (*idx) ++;
409         }
410 }
411
412 static int bus_write_auth(sd_bus *b) {
413         struct msghdr mh;
414         ssize_t k;
415
416         assert(b);
417         assert(b->state == BUS_AUTHENTICATING);
418
419         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
420                 return 0;
421
422         if (b->auth_timeout == 0)
423                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
424
425         zero(mh);
426         mh.msg_iov = b->auth_iovec + b->auth_index;
427         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
428
429         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
430         if (k < 0)
431                 return errno == EAGAIN ? 0 : -errno;
432
433         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
434
435         return 1;
436 }
437
438 static int bus_auth_verify(sd_bus *b) {
439         char *e, *f;
440         sd_id128_t peer;
441         unsigned i;
442         int r;
443
444         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
445          * that's it */
446
447         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
448         if (!e)
449                 return 0;
450
451         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
452         if (!f)
453                 return 0;
454
455         if (e - (char*) b->rbuffer != 3 + 32)
456                 return -EPERM;
457
458         if (memcmp(b->rbuffer, "OK ", 3))
459                 return -EPERM;
460
461         for (i = 0; i < 32; i += 2) {
462                 int x, y;
463
464                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
465                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
466
467                 if (x < 0 || y < 0)
468                         return -EINVAL;
469
470                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
471         }
472
473         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
474             !sd_id128_equal(b->peer, peer))
475                 return -EPERM;
476
477         b->peer = peer;
478
479         b->can_fds =
480                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
481                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
482
483         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
484         memmove(b->rbuffer, f + 2, b->rbuffer_size);
485
486         r = bus_start_running(b);
487         if (r < 0)
488                 return r;
489
490         return 1;
491 }
492
493 static int bus_read_auth(sd_bus *b) {
494         struct msghdr mh;
495         struct iovec iov;
496         size_t n;
497         ssize_t k;
498         int r;
499         void *p;
500
501         assert(b);
502
503         r = bus_auth_verify(b);
504         if (r != 0)
505                 return r;
506
507         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
508
509         if (n > BUS_AUTH_SIZE_MAX)
510                 n = BUS_AUTH_SIZE_MAX;
511
512         if (b->rbuffer_size >= n)
513                 return -ENOBUFS;
514
515         p = realloc(b->rbuffer, n);
516         if (!p)
517                 return -ENOMEM;
518
519         b->rbuffer = p;
520
521         zero(iov);
522         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
523         iov.iov_len = n - b->rbuffer_size;
524
525         zero(mh);
526         mh.msg_iov = &iov;
527         mh.msg_iovlen = 1;
528
529         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
530         if (k < 0)
531                 return errno == EAGAIN ? 0 : -errno;
532
533         b->rbuffer_size += k;
534
535         r = bus_auth_verify(b);
536         if (r != 0)
537                 return r;
538
539         return 1;
540 }
541
542 static int bus_setup_fd(sd_bus *b) {
543         int one;
544
545         assert(b);
546
547         /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
548          * just in case. This is actually irrelavant for */
549         one = 1;
550         setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
551         setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
552
553         /* Increase the buffers to a MB */
554         fd_inc_rcvbuf(b->fd, 1024*1024);
555         fd_inc_sndbuf(b->fd, 1024*1024);
556
557         return 0;
558 }
559
560 static int bus_start_auth(sd_bus *b) {
561         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
562         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
563
564         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
565         size_t l;
566
567         assert(b);
568
569         b->state = BUS_AUTHENTICATING;
570
571         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
572         char_array_0(text);
573
574         l = strlen(text);
575         b->auth_uid = hexmem(text, l);
576         if (!b->auth_uid)
577                 return -ENOMEM;
578
579         b->auth_iovec[0].iov_base = (void*) auth_prefix;
580         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
581         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
582         b->auth_iovec[1].iov_len = l * 2;
583         b->auth_iovec[2].iov_base = (void*) auth_suffix;
584         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
585         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
586
587         return bus_write_auth(b);
588 }
589
590 static int bus_start_connect(sd_bus *b) {
591         int r;
592
593         assert(b);
594         assert(b->fd < 0);
595
596         for (;;) {
597                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
598                         r = bus_parse_next_address(b);
599                         if (r < 0)
600                                 return r;
601                         if (r == 0)
602                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
603                 }
604
605                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
606                 if (b->fd < 0) {
607                         b->last_connect_error = errno;
608                         goto try_again;
609                 }
610
611                 r = bus_setup_fd(b);
612                 if (r < 0) {
613                         b->last_connect_error = errno;
614                         goto try_again;
615                 }
616
617                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
618                 if (r < 0) {
619                         if (errno == EINPROGRESS)
620                                 return 1;
621
622                         b->last_connect_error = errno;
623                         goto try_again;
624                 }
625
626                 return bus_start_auth(b);
627
628         try_again:
629                 zero(b->sockaddr);
630
631                 if (b->fd >= 0) {
632                         close_nointr_nofail(b->fd);
633                         b->fd = -1;
634                 }
635         }
636 }
637
638 int sd_bus_open_system(sd_bus **ret) {
639         const char *e;
640         sd_bus *b;
641         int r;
642
643         if (!ret)
644                 return -EINVAL;
645
646         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
647         if (e) {
648                 r = sd_bus_open_address(e, &b);
649                 if (r < 0)
650                         return r;
651         } else {
652                 b = bus_new();
653                 if (!b)
654                         return -ENOMEM;
655
656                 b->sockaddr.un.sun_family = AF_UNIX;
657                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
658                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
659
660                 r = bus_start_connect(b);
661                 if (r < 0) {
662                         bus_free(b);
663                         return r;
664                 }
665         }
666
667         r = bus_send_hello(b);
668         if (r < 0) {
669                 sd_bus_unref(b);
670                 return r;
671         }
672
673         *ret = b;
674         return 0;
675 }
676
677 int sd_bus_open_user(sd_bus **ret) {
678         const char *e;
679         sd_bus *b;
680         size_t l;
681         int r;
682
683         if (!ret)
684                 return -EINVAL;
685
686         e = getenv("DBUS_SESSION_BUS_ADDRESS");
687         if (e) {
688                 r = sd_bus_open_address(e, &b);
689                 if (r < 0)
690                         return r;
691         } else {
692                 e = getenv("XDG_RUNTIME_DIR");
693                 if (!e)
694                         return -ENOENT;
695
696                 l = strlen(e);
697                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
698                         return -E2BIG;
699
700                 b = bus_new();
701                 if (!b)
702                         return -ENOMEM;
703
704                 b->sockaddr.un.sun_family = AF_UNIX;
705                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
706                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
707
708                 r = bus_start_connect(b);
709                 if (r < 0) {
710                         bus_free(b);
711                         return r;
712                 }
713         }
714
715         r = bus_send_hello(b);
716         if (r < 0) {
717                 sd_bus_unref(b);
718                 return r;
719         }
720
721         *ret = b;
722         return 0;
723 }
724
725 int sd_bus_open_address(const char *address, sd_bus **ret) {
726         sd_bus *b;
727         int r;
728
729         if (!address)
730                 return -EINVAL;
731         if (!ret)
732                 return -EINVAL;
733
734         b = bus_new();
735         if (!b)
736                 return -ENOMEM;
737
738         b->address = strdup(address);
739         if (!b->address) {
740                 bus_free(b);
741                 return -ENOMEM;
742         }
743
744         r = bus_start_connect(b);
745         if (r < 0) {
746                 bus_free(b);
747                 return r;
748         }
749
750         *ret = b;
751         return 0;
752 }
753
754 int sd_bus_open_fd(int fd, sd_bus **ret) {
755         sd_bus *b;
756         int r;
757
758         if (fd < 0)
759                 return -EINVAL;
760         if (!ret)
761                 return -EINVAL;
762
763         b = bus_new();
764         if (!b)
765                 return -ENOMEM;
766
767         b->fd = fd;
768
769         r = fd_nonblock(b->fd, true);
770         if (r < 0)
771                 goto fail;
772
773         fd_cloexec(b->fd, true);
774         if (r < 0)
775                 goto fail;
776
777         r = bus_setup_fd(b);
778         if (r < 0)
779                 goto fail;
780
781         r = bus_start_auth(b);
782         if (r < 0)
783                 goto fail;
784
785         *ret = b;
786         return 0;
787
788 fail:
789                 bus_free(b);
790         return r;
791 }
792
793 void sd_bus_close(sd_bus *bus) {
794         if (!bus)
795                 return;
796         if (bus->fd < 0)
797                 return;
798
799         close_nointr_nofail(bus->fd);
800         bus->fd = -1;
801 }
802
803 sd_bus *sd_bus_ref(sd_bus *bus) {
804         if (!bus)
805                 return NULL;
806
807         assert(bus->n_ref > 0);
808
809         bus->n_ref++;
810         return bus;
811 }
812
813 sd_bus *sd_bus_unref(sd_bus *bus) {
814         if (!bus)
815                 return NULL;
816
817         assert(bus->n_ref > 0);
818         bus->n_ref--;
819
820         if (bus->n_ref <= 0)
821                 bus_free(bus);
822
823         return NULL;
824 }
825
826 int sd_bus_is_open(sd_bus *bus) {
827         if (!bus)
828                 return -EINVAL;
829
830         return bus->fd >= 0;
831 }
832
833 int sd_bus_can_send(sd_bus *bus, char type) {
834         int r;
835
836         if (!bus)
837                 return -EINVAL;
838
839         if (type == SD_BUS_TYPE_UNIX_FD) {
840                 r = bus_ensure_running(bus);
841                 if (r < 0)
842                         return r;
843
844                 return bus->can_fds;
845         }
846
847         return bus_type_is_valid(type);
848 }
849
850 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
851         int r;
852
853         if (!bus)
854                 return -EINVAL;
855         if (!peer)
856                 return -EINVAL;
857
858         r = bus_ensure_running(bus);
859         if (r < 0)
860                 return r;
861
862         *peer = bus->peer;
863         return 0;
864 }
865
866 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
867         assert(m);
868
869         if (m->header->version > b->message_version)
870                 return -EPERM;
871
872         if (m->sealed)
873                 return 0;
874
875         return bus_message_seal(m, ++b->serial);
876 }
877
878 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
879         struct msghdr mh;
880         struct iovec *iov;
881         ssize_t k;
882         size_t n;
883         unsigned j;
884
885         assert(bus);
886         assert(m);
887         assert(idx);
888         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
889
890         if (*idx >= m->size)
891                 return 0;
892
893         n = m->n_iovec * sizeof(struct iovec);
894         iov = alloca(n);
895         memcpy(iov, m->iovec, n);
896
897         j = 0;
898         iovec_advance(iov, &j, *idx);
899
900         zero(mh);
901         mh.msg_iov = iov;
902         mh.msg_iovlen = m->n_iovec;
903
904         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
905         if (k < 0)
906                 return errno == EAGAIN ? 0 : -errno;
907
908         *idx += (size_t) k;
909         return 1;
910 }
911
912 static int message_read_need(sd_bus *bus, size_t *need) {
913         uint32_t a, b;
914         uint8_t e;
915         uint64_t sum;
916
917         assert(bus);
918         assert(need);
919         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
920
921         if (bus->rbuffer_size < sizeof(struct bus_header)) {
922                 *need = sizeof(struct bus_header) + 8;
923
924                 /* Minimum message size:
925                  *
926                  * Header +
927                  *
928                  *  Method Call: +2 string headers
929                  *       Signal: +3 string headers
930                  * Method Error: +1 string headers
931                  *               +1 uint32 headers
932                  * Method Reply: +1 uint32 headers
933                  *
934                  * A string header is at least 9 bytes
935                  * A uint32 header is at least 8 bytes
936                  *
937                  * Hence the minimum message size of a valid message
938                  * is header + 8 bytes */
939
940                 return 0;
941         }
942
943         a = ((const uint32_t*) bus->rbuffer)[1];
944         b = ((const uint32_t*) bus->rbuffer)[3];
945
946         e = ((const uint8_t*) bus->rbuffer)[0];
947         if (e == SD_BUS_LITTLE_ENDIAN) {
948                 a = le32toh(a);
949                 b = le32toh(b);
950         } else if (e == SD_BUS_BIG_ENDIAN) {
951                 a = be32toh(a);
952                 b = be32toh(b);
953         } else
954                 return -EBADMSG;
955
956         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
957         if (sum >= BUS_MESSAGE_SIZE_MAX)
958                 return -ENOBUFS;
959
960         *need = (size_t) sum;
961         return 0;
962 }
963
964 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
965         sd_bus_message *t;
966         void *b = NULL;
967         int r;
968
969         assert(bus);
970         assert(m);
971         assert(bus->rbuffer_size >= size);
972         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
973
974         if (bus->rbuffer_size > size) {
975                 b = memdup((const uint8_t*) bus->rbuffer + size,
976                            bus->rbuffer_size - size);
977                 if (!b)
978                         return -ENOMEM;
979         }
980
981         r = bus_message_from_malloc(bus->rbuffer, size,
982                                     bus->ucred_valid ? &bus->ucred : NULL,
983                                     bus->label[0] ? bus->label : NULL, &t);
984         if (r < 0) {
985                 free(b);
986                 return r;
987         }
988
989         bus->rbuffer = b;
990         bus->rbuffer_size -= size;
991
992         *m = t;
993         return 1;
994 }
995
996 static int message_read(sd_bus *bus, sd_bus_message **m) {
997         struct msghdr mh;
998         struct iovec iov;
999         ssize_t k;
1000         size_t need;
1001         int r;
1002         void *b;
1003         union {
1004                 struct cmsghdr cmsghdr;
1005                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1006                             CMSG_SPACE(NAME_MAX)]; /*selinux label */
1007         } control;
1008         struct cmsghdr *cmsg;
1009
1010         assert(bus);
1011         assert(m);
1012         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1013
1014         r = message_read_need(bus, &need);
1015         if (r < 0)
1016                 return r;
1017
1018         if (bus->rbuffer_size >= need)
1019                 return message_make(bus, need, m);
1020
1021         b = realloc(bus->rbuffer, need);
1022         if (!b)
1023                 return -ENOMEM;
1024
1025         bus->rbuffer = b;
1026
1027         zero(iov);
1028         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1029         iov.iov_len = need - bus->rbuffer_size;
1030
1031         zero(mh);
1032         mh.msg_iov = &iov;
1033         mh.msg_iovlen = 1;
1034         mh.msg_control = &control;
1035         mh.msg_controllen = sizeof(control);
1036
1037         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1038         if (k < 0)
1039                 return errno == EAGAIN ? 0 : -errno;
1040
1041         bus->rbuffer_size += k;
1042         bus->ucred_valid = false;
1043         bus->label[0] = 0;
1044
1045         for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1046                 if (cmsg->cmsg_level == SOL_SOCKET &&
1047                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1048                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1049
1050                         memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1051                         bus->ucred_valid = true;
1052
1053                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1054                          cmsg->cmsg_type == SCM_SECURITY) {
1055
1056                         size_t l;
1057                         l = cmsg->cmsg_len - CMSG_LEN(0);
1058                         memcpy(&bus->label, CMSG_DATA(cmsg), l);
1059                         bus->label[l] = 0;
1060                 }
1061         }
1062
1063         r = message_read_need(bus, &need);
1064         if (r < 0)
1065                 return r;
1066
1067         if (bus->rbuffer_size >= need)
1068                 return message_make(bus, need, m);
1069
1070         return 1;
1071 }
1072
1073 static int dispatch_wqueue(sd_bus *bus) {
1074         int r, ret = 0;
1075
1076         assert(bus);
1077         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1078
1079         if (bus->fd < 0)
1080                 return -ENOTCONN;
1081
1082         while (bus->wqueue_size > 0) {
1083
1084                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1085                 if (r < 0) {
1086                         sd_bus_close(bus);
1087                         return r;
1088                 } else if (r == 0)
1089                         /* Didn't do anything this time */
1090                         return ret;
1091                 else if (bus->windex >= bus->wqueue[0]->size) {
1092                         /* Fully written. Let's drop the entry from
1093                          * the queue.
1094                          *
1095                          * This isn't particularly optimized, but
1096                          * well, this is supposed to be our worst-case
1097                          * buffer only, and the socket buffer is
1098                          * supposed to be our primary buffer, and if
1099                          * it got full, then all bets are off
1100                          * anyway. */
1101
1102                         sd_bus_message_unref(bus->wqueue[0]);
1103                         bus->wqueue_size --;
1104                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1105                         bus->windex = 0;
1106
1107                         ret = 1;
1108                 }
1109         }
1110
1111         return ret;
1112 }
1113
1114 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1115         sd_bus_message *z = NULL;
1116         int r, ret = 0;
1117
1118         assert(bus);
1119         assert(m);
1120         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1121
1122         if (bus->fd < 0)
1123                 return -ENOTCONN;
1124
1125         if (bus->rqueue_size > 0) {
1126                 /* Dispatch a queued message */
1127
1128                 *m = bus->rqueue[0];
1129                 bus->rqueue_size --;
1130                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1131                 return 1;
1132         }
1133
1134         /* Try to read a new message */
1135         do {
1136                 r = message_read(bus, &z);
1137                 if (r < 0) {
1138                         sd_bus_close(bus);
1139                         return r;
1140                 }
1141                 if (r == 0)
1142                         return ret;
1143
1144                 r = 1;
1145         } while (!z);
1146
1147         *m = z;
1148         return 1;
1149 }
1150
1151 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1152         int r;
1153
1154         if (!bus)
1155                 return -EINVAL;
1156         if (bus->fd < 0)
1157                 return -ENOTCONN;
1158         if (!m)
1159                 return -EINVAL;
1160
1161         /* If the serial number isn't kept, then we know that no reply
1162          * is expected */
1163         if (!serial && !m->sealed)
1164                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1165
1166         r = bus_seal_message(bus, m);
1167         if (r < 0)
1168                 return r;
1169
1170         /* If this is a reply and no reply was requested, then let's
1171          * suppress this, if we can */
1172         if (m->dont_send && !serial)
1173                 return 0;
1174
1175         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1176                 size_t idx = 0;
1177
1178                 r = message_write(bus, m, &idx);
1179                 if (r < 0) {
1180                         sd_bus_close(bus);
1181                         return r;
1182                 } else if (idx < m->size)  {
1183                         /* Wasn't fully written. So let's remember how
1184                          * much was written. Note that the first entry
1185                          * of the wqueue array is always allocated so
1186                          * that we always can remember how much was
1187                          * written. */
1188                         bus->wqueue[0] = sd_bus_message_ref(m);
1189                         bus->wqueue_size = 1;
1190                         bus->windex = idx;
1191                 }
1192         } else {
1193                 sd_bus_message **q;
1194
1195                 /* Just append it to the queue. */
1196
1197                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1198                         return -ENOBUFS;
1199
1200                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1201                 if (!q)
1202                         return -ENOMEM;
1203
1204                 bus->wqueue = q;
1205                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1206         }
1207
1208         if (serial)
1209                 *serial = BUS_MESSAGE_SERIAL(m);
1210
1211         return 0;
1212 }
1213
1214 static usec_t calc_elapse(uint64_t usec) {
1215         if (usec == (uint64_t) -1)
1216                 return 0;
1217
1218         if (usec == 0)
1219                 usec = BUS_DEFAULT_TIMEOUT;
1220
1221         return now(CLOCK_MONOTONIC) + usec;
1222 }
1223
1224 static int timeout_compare(const void *a, const void *b) {
1225         const struct reply_callback *x = a, *y = b;
1226
1227         if (x->timeout != 0 && y->timeout == 0)
1228                 return -1;
1229
1230         if (x->timeout == 0 && y->timeout != 0)
1231                 return 1;
1232
1233         if (x->timeout < y->timeout)
1234                 return -1;
1235
1236         if (x->timeout > y->timeout)
1237                 return 1;
1238
1239         return 0;
1240 }
1241
1242 int sd_bus_send_with_reply(
1243                 sd_bus *bus,
1244                 sd_bus_message *m,
1245                 sd_message_handler_t callback,
1246                 void *userdata,
1247                 uint64_t usec,
1248                 uint64_t *serial) {
1249
1250         struct reply_callback *c;
1251         int r;
1252
1253         if (!bus)
1254                 return -EINVAL;
1255         if (bus->fd < 0)
1256                 return -ENOTCONN;
1257         if (!m)
1258                 return -EINVAL;
1259         if (!callback)
1260                 return -EINVAL;
1261         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1262                 return -EINVAL;
1263         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1264                 return -EINVAL;
1265
1266         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1267         if (r < 0)
1268                 return r;
1269
1270         if (usec != (uint64_t) -1) {
1271                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1272                 if (r < 0)
1273                         return r;
1274         }
1275
1276         r = bus_seal_message(bus, m);
1277         if (r < 0)
1278                 return r;
1279
1280         c = new(struct reply_callback, 1);
1281         if (!c)
1282                 return -ENOMEM;
1283
1284         c->callback = callback;
1285         c->userdata = userdata;
1286         c->serial = BUS_MESSAGE_SERIAL(m);
1287         c->timeout = calc_elapse(usec);
1288
1289         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1290         if (r < 0) {
1291                 free(c);
1292                 return r;
1293         }
1294
1295         if (c->timeout != 0) {
1296                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1297                 if (r < 0) {
1298                         c->timeout = 0;
1299                         sd_bus_send_with_reply_cancel(bus, c->serial);
1300                         return r;
1301                 }
1302         }
1303
1304         r = sd_bus_send(bus, m, serial);
1305         if (r < 0) {
1306                 sd_bus_send_with_reply_cancel(bus, c->serial);
1307                 return r;
1308         }
1309
1310         return r;
1311 }
1312
1313 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1314         struct reply_callback *c;
1315
1316         if (!bus)
1317                 return -EINVAL;
1318         if (serial == 0)
1319                 return -EINVAL;
1320
1321         c = hashmap_remove(bus->reply_callbacks, &serial);
1322         if (!c)
1323                 return 0;
1324
1325         if (c->timeout != 0)
1326                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1327
1328         free(c);
1329         return 1;
1330 }
1331
1332 int bus_ensure_running(sd_bus *bus) {
1333         int r;
1334
1335         assert(bus);
1336
1337         if (bus->state == BUS_RUNNING)
1338                 return 1;
1339
1340         for (;;) {
1341                 r = sd_bus_process(bus, NULL);
1342                 if (r < 0)
1343                         return r;
1344                 if (bus->state == BUS_RUNNING)
1345                         return 1;
1346                 if (r > 0)
1347                         continue;
1348
1349                 r = sd_bus_wait(bus, (uint64_t) -1);
1350                 if (r < 0)
1351                         return r;
1352         }
1353 }
1354
1355 int sd_bus_send_with_reply_and_block(
1356                 sd_bus *bus,
1357                 sd_bus_message *m,
1358                 uint64_t usec,
1359                 sd_bus_error *error,
1360                 sd_bus_message **reply) {
1361
1362         int r;
1363         usec_t timeout;
1364         uint64_t serial;
1365         bool room = false;
1366
1367         if (!bus)
1368                 return -EINVAL;
1369         if (bus->fd < 0)
1370                 return -ENOTCONN;
1371         if (!m)
1372                 return -EINVAL;
1373         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1374                 return -EINVAL;
1375         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1376                 return -EINVAL;
1377         if (bus_error_is_dirty(error))
1378                 return -EINVAL;
1379
1380         r = bus_ensure_running(bus);
1381         if (r < 0)
1382                 return r;
1383
1384         r = sd_bus_send(bus, m, &serial);
1385         if (r < 0)
1386                 return r;
1387
1388         timeout = calc_elapse(usec);
1389
1390         for (;;) {
1391                 usec_t left;
1392                 sd_bus_message *incoming = NULL;
1393
1394                 if (!room) {
1395                         sd_bus_message **q;
1396
1397                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1398                                 return -ENOBUFS;
1399
1400                         /* Make sure there's room for queuing this
1401                          * locally, before we read the message */
1402
1403                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1404                         if (!q)
1405                                 return -ENOMEM;
1406
1407                         bus->rqueue = q;
1408                         room = true;
1409                 }
1410
1411                 r = message_read(bus, &incoming);
1412                 if (r < 0)
1413                         return r;
1414                 if (incoming) {
1415
1416                         if (incoming->reply_serial == serial) {
1417                                 /* Found a match! */
1418
1419                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1420                                         *reply = incoming;
1421                                         return 0;
1422                                 }
1423
1424                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1425                                         int k;
1426
1427                                         r = sd_bus_error_copy(error, &incoming->error);
1428                                         if (r < 0) {
1429                                                 sd_bus_message_unref(incoming);
1430                                                 return r;
1431                                         }
1432
1433                                         k = bus_error_to_errno(&incoming->error);
1434                                         sd_bus_message_unref(incoming);
1435                                         return k;
1436                                 }
1437
1438                                 sd_bus_message_unref(incoming);
1439                                 return -EIO;
1440                         }
1441
1442                         /* There's already guaranteed to be room for
1443                          * this, so need to resize things here */
1444                         bus->rqueue[bus->rqueue_size ++] = incoming;
1445                         room = false;
1446
1447                         /* Try to read more, right-away */
1448                         continue;
1449                 }
1450                 if (r != 0)
1451                         continue;
1452
1453                 if (timeout > 0) {
1454                         usec_t n;
1455
1456                         n = now(CLOCK_MONOTONIC);
1457                         if (n >= timeout)
1458                                 return -ETIMEDOUT;
1459
1460                         left = timeout - n;
1461                 } else
1462                         left = (uint64_t) -1;
1463
1464                 r = bus_poll(bus, true, left);
1465                 if (r < 0)
1466                         return r;
1467
1468                 r = dispatch_wqueue(bus);
1469                 if (r < 0)
1470                         return r;
1471         }
1472 }
1473
1474 int sd_bus_get_fd(sd_bus *bus) {
1475         if (!bus)
1476                 return -EINVAL;
1477
1478         if (bus->fd < 0)
1479                 return -ENOTCONN;
1480
1481         return bus->fd;
1482 }
1483
1484 int sd_bus_get_events(sd_bus *bus) {
1485         int flags = 0;
1486
1487         if (!bus)
1488                 return -EINVAL;
1489         if (bus->fd < 0)
1490                 return -ENOTCONN;
1491
1492         if (bus->state == BUS_OPENING)
1493                 flags |= POLLOUT;
1494         else if (bus->state == BUS_AUTHENTICATING) {
1495
1496                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1497                         flags |= POLLOUT;
1498
1499                 flags |= POLLIN;
1500
1501         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1502                 if (bus->rqueue_size <= 0)
1503                         flags |= POLLIN;
1504                 if (bus->wqueue_size > 0)
1505                         flags |= POLLOUT;
1506         }
1507
1508         return flags;
1509 }
1510
1511 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1512         struct reply_callback *c;
1513
1514         if (!bus)
1515                 return -EINVAL;
1516         if (!timeout_usec)
1517                 return -EINVAL;
1518         if (bus->fd < 0)
1519                 return -ENOTCONN;
1520
1521         if (bus->state == BUS_AUTHENTICATING) {
1522                 *timeout_usec = bus->auth_timeout;
1523                 return 1;
1524         }
1525
1526         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1527                 return 0;
1528
1529         c = prioq_peek(bus->reply_callbacks_prioq);
1530         if (!c)
1531                 return 0;
1532
1533         *timeout_usec = c->timeout;
1534         return 1;
1535 }
1536
1537 static int process_timeout(sd_bus *bus) {
1538         struct reply_callback *c;
1539         usec_t n;
1540         int r;
1541
1542         assert(bus);
1543
1544         c = prioq_peek(bus->reply_callbacks_prioq);
1545         if (!c)
1546                 return 0;
1547
1548         n = now(CLOCK_MONOTONIC);
1549         if (c->timeout > n)
1550                 return 0;
1551
1552         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1553         hashmap_remove(bus->reply_callbacks, &c->serial);
1554
1555         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1556         free(c);
1557
1558         return r < 0 ? r : 1;
1559 }
1560
1561 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1562         struct reply_callback *c;
1563         int r;
1564
1565         assert(bus);
1566         assert(m);
1567
1568         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1569             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1570                 return 0;
1571
1572         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1573         if (!c)
1574                 return 0;
1575
1576         if (c->timeout != 0)
1577                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1578
1579         r = c->callback(bus, 0, m, c->userdata);
1580         free(c);
1581
1582         return r;
1583 }
1584
1585 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1586         struct filter_callback *l;
1587         int r;
1588
1589         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1590                 r = l->callback(bus, 0, m, l->userdata);
1591                 if (r != 0)
1592                         return r;
1593         }
1594
1595         return 0;
1596 }
1597
1598 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1599         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1600         int r;
1601
1602         assert(bus);
1603         assert(m);
1604
1605         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1606                 return 0;
1607
1608         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1609                 return 0;
1610
1611         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1612                 return 1;
1613
1614         if (streq_ptr(m->member, "Ping"))
1615                 r = sd_bus_message_new_method_return(bus, m, &reply);
1616         else if (streq_ptr(m->member, "GetMachineId")) {
1617                 sd_id128_t id;
1618                 char sid[33];
1619
1620                 r = sd_id128_get_machine(&id);
1621                 if (r < 0)
1622                         return r;
1623
1624                 r = sd_bus_message_new_method_return(bus, m, &reply);
1625                 if (r < 0)
1626                         return r;
1627
1628                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1629         } else {
1630                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1631
1632                 sd_bus_error_set(&error,
1633                                  "org.freedesktop.DBus.Error.UnknownMethod",
1634                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1635
1636                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1637         }
1638
1639         if (r < 0)
1640                 return r;
1641
1642         r = sd_bus_send(bus, reply, NULL);
1643         if (r < 0)
1644                 return r;
1645
1646         return 1;
1647 }
1648
1649 static int process_object(sd_bus *bus, sd_bus_message *m) {
1650         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1651         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1652         struct object_callback *c;
1653         char *p;
1654         int r;
1655         bool found = false;
1656
1657         assert(bus);
1658         assert(m);
1659
1660         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1661                 return 0;
1662
1663         if (hashmap_isempty(bus->object_callbacks))
1664                 return 0;
1665
1666         c = hashmap_get(bus->object_callbacks, m->path);
1667         if (c) {
1668                 r = c->callback(bus, 0, m, c->userdata);
1669                 if (r != 0)
1670                         return r;
1671
1672                 found = true;
1673         }
1674
1675         /* Look for fallback prefixes */
1676         p = strdupa(m->path);
1677         for (;;) {
1678                 char *e;
1679
1680                 e = strrchr(p, '/');
1681                 if (e == p || !e)
1682                         break;
1683
1684                 *e = 0;
1685
1686                 c = hashmap_get(bus->object_callbacks, p);
1687                 if (c && c->is_fallback) {
1688                         r = c->callback(bus, 0, m, c->userdata);
1689                         if (r != 0)
1690                                 return r;
1691
1692                         found = true;
1693                 }
1694         }
1695
1696         if (!found)
1697                 return 0;
1698
1699         sd_bus_error_set(&error,
1700                          "org.freedesktop.DBus.Error.UnknownMethod",
1701                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1702
1703         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1704         if (r < 0)
1705                 return r;
1706
1707         r = sd_bus_send(bus, reply, NULL);
1708         if (r < 0)
1709                 return r;
1710
1711         return 1;
1712 }
1713
1714 static int process_message(sd_bus *bus, sd_bus_message *m) {
1715         int r;
1716
1717         assert(bus);
1718         assert(m);
1719
1720         r = process_reply(bus, m);
1721         if (r != 0)
1722                 return r;
1723
1724         r = process_filter(bus, m);
1725         if (r != 0)
1726                 return r;
1727
1728         r = process_builtin(bus, m);
1729         if (r != 0)
1730                 return r;
1731
1732         return process_object(bus, m);
1733 }
1734
1735 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1736         int r;
1737
1738         /* Returns 0 when we didn't do anything. This should cause the
1739          * caller to invoke sd_bus_wait() before returning the next
1740          * time. Returns > 0 when we did something, which possibly
1741          * means *ret is filled in with an unprocessed message. */
1742
1743         if (!bus)
1744                 return -EINVAL;
1745         if (bus->fd < 0)
1746                 return -ENOTCONN;
1747
1748         if (bus->state == BUS_OPENING) {
1749                 struct pollfd p;
1750
1751                 zero(p);
1752                 p.fd = bus->fd;
1753                 p.events = POLLOUT;
1754
1755                 r = poll(&p, 1, 0);
1756                 if (r < 0)
1757                         return -errno;
1758
1759                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1760                         int error = 0;
1761                         socklen_t slen = sizeof(error);
1762
1763                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1764                         if (r < 0)
1765                                 bus->last_connect_error = errno;
1766                         else if (error != 0)
1767                                 bus->last_connect_error = error;
1768                         else if (p.revents & (POLLERR|POLLHUP))
1769                                 bus->last_connect_error = ECONNREFUSED;
1770                         else {
1771                                 r = bus_start_auth(bus);
1772                                 goto null_message;
1773                         }
1774
1775                         /* Try next address */
1776                         r = bus_start_connect(bus);
1777                         goto null_message;
1778                 }
1779
1780                 r = 0;
1781                 goto null_message;
1782
1783         } else if (bus->state == BUS_AUTHENTICATING) {
1784
1785                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1786                         return -ETIMEDOUT;
1787
1788                 r = bus_write_auth(bus);
1789                 if (r != 0)
1790                         goto null_message;
1791
1792                 r = bus_read_auth(bus);
1793                 goto null_message;
1794
1795         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1796                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1797                 int k;
1798
1799                 r = process_timeout(bus);
1800                 if (r != 0)
1801                         goto null_message;
1802
1803                 r = dispatch_wqueue(bus);
1804                 if (r != 0)
1805                         goto null_message;
1806
1807                 k = r;
1808                 r = dispatch_rqueue(bus, &m);
1809                 if (r < 0)
1810                         return r;
1811                 if (!m) {
1812                         if (r == 0)
1813                                 r = k;
1814                         goto null_message;
1815                 }
1816
1817                 r = process_message(bus, m);
1818                 if (r != 0)
1819                         goto null_message;
1820
1821                 if (ret) {
1822                         *ret = m;
1823                         m = NULL;
1824                         return 1;
1825                 }
1826
1827                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1828                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1829                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1830
1831                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1832
1833                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1834                         if (r < 0)
1835                                 return r;
1836
1837                         r = sd_bus_send(bus, reply, NULL);
1838                         if (r < 0)
1839                                 return r;
1840                 }
1841
1842                 return 1;
1843         }
1844
1845         assert_not_reached("Unknown state");
1846
1847 null_message:
1848         if (r >= 0 && ret)
1849                 *ret = NULL;
1850
1851         return r;
1852 }
1853
1854 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1855         struct pollfd p;
1856         int r, e;
1857         struct timespec ts;
1858         usec_t until, m;
1859
1860         assert(bus);
1861
1862         if (bus->fd < 0)
1863                 return -ENOTCONN;
1864
1865         e = sd_bus_get_events(bus);
1866         if (e < 0)
1867                 return e;
1868
1869         if (need_more)
1870                 e |= POLLIN;
1871
1872         r = sd_bus_get_timeout(bus, &until);
1873         if (r < 0)
1874                 return r;
1875         if (r == 0)
1876                 m = (uint64_t) -1;
1877         else {
1878                 usec_t n;
1879                 n = now(CLOCK_MONOTONIC);
1880                 m = until > n ? until - n : 0;
1881         }
1882
1883         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1884                 m = timeout_usec;
1885
1886         zero(p);
1887         p.fd = bus->fd;
1888         p.events = e;
1889
1890         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1891         if (r < 0)
1892                 return -errno;
1893
1894         return r > 0 ? 1 : 0;
1895 }
1896
1897 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1898
1899         if (!bus)
1900                 return -EINVAL;
1901         if (bus->fd < 0)
1902                 return -ENOTCONN;
1903         if (bus->rqueue_size > 0)
1904                 return 0;
1905
1906         return bus_poll(bus, false, timeout_usec);
1907 }
1908
1909 int sd_bus_flush(sd_bus *bus) {
1910         int r;
1911
1912         if (!bus)
1913                 return -EINVAL;
1914         if (bus->fd < 0)
1915                 return -ENOTCONN;
1916
1917         r = bus_ensure_running(bus);
1918         if (r < 0)
1919                 return r;
1920
1921         if (bus->wqueue_size <= 0)
1922                 return 0;
1923
1924         for (;;) {
1925                 r = dispatch_wqueue(bus);
1926                 if (r < 0)
1927                         return r;
1928
1929                 if (bus->wqueue_size <= 0)
1930                         return 0;
1931
1932                 r = bus_poll(bus, false, (uint64_t) -1);
1933                 if (r < 0)
1934                         return r;
1935         }
1936 }
1937
1938 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1939         struct filter_callback *f;
1940
1941         if (!bus)
1942                 return -EINVAL;
1943         if (!callback)
1944                 return -EINVAL;
1945
1946         f = new(struct filter_callback, 1);
1947         if (!f)
1948                 return -ENOMEM;
1949         f->callback = callback;
1950         f->userdata = userdata;
1951
1952         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1953         return 0;
1954 }
1955
1956 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1957         struct filter_callback *f;
1958
1959         if (!bus)
1960                 return -EINVAL;
1961         if (!callback)
1962                 return -EINVAL;
1963
1964         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1965                 if (f->callback == callback && f->userdata == userdata) {
1966                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1967                         free(f);
1968                         return 1;
1969                 }
1970         }
1971
1972         return 0;
1973 }
1974
1975 static int bus_add_object(
1976                 sd_bus *bus,
1977                 bool fallback,
1978                 const char *path,
1979                 sd_message_handler_t callback,
1980                 void *userdata) {
1981
1982         struct object_callback *c;
1983         int r;
1984
1985         if (!bus)
1986                 return -EINVAL;
1987         if (!path)
1988                 return -EINVAL;
1989         if (!callback)
1990                 return -EINVAL;
1991
1992         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
1993         if (r < 0)
1994                 return r;
1995
1996         c = new(struct object_callback, 1);
1997         if (!c)
1998                 return -ENOMEM;
1999
2000         c->path = strdup(path);
2001         if (!path) {
2002                 free(c);
2003                 return -ENOMEM;
2004         }
2005
2006         c->callback = callback;
2007         c->userdata = userdata;
2008         c->is_fallback = fallback;
2009
2010         r = hashmap_put(bus->object_callbacks, c->path, c);
2011         if (r < 0) {
2012                 free(c->path);
2013                 free(c);
2014                 return r;
2015         }
2016
2017         return 0;
2018 }
2019
2020 static int bus_remove_object(
2021                 sd_bus *bus,
2022                 bool fallback,
2023                 const char *path,
2024                 sd_message_handler_t callback,
2025                 void *userdata) {
2026
2027         struct object_callback *c;
2028
2029         if (!bus)
2030                 return -EINVAL;
2031         if (!path)
2032                 return -EINVAL;
2033         if (!callback)
2034                 return -EINVAL;
2035
2036         c = hashmap_get(bus->object_callbacks, path);
2037         if (!c)
2038                 return 0;
2039
2040         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2041                 return 0;
2042
2043         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2044
2045         free(c->path);
2046         free(c);
2047
2048         return 1;
2049 }
2050
2051 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2052         return bus_add_object(bus, false, path, callback, userdata);
2053 }
2054
2055 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2056         return bus_remove_object(bus, false, path, callback, userdata);
2057 }
2058
2059 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2060         return bus_add_object(bus, true, prefix, callback, userdata);
2061 }
2062
2063 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2064         return bus_remove_object(bus, true, prefix, callback, userdata);
2065 }