chiark / gitweb /
bus: make optional whether unix socket passing is negotiated and whether hello is...
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "missing.h"
33
34 #include "sd-bus.h"
35 #include "bus-internal.h"
36 #include "bus-message.h"
37 #include "bus-type.h"
38
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42         struct filter_callback *f;
43         struct object_callback *c;
44         unsigned i;
45
46         assert(b);
47
48         if (b->fd >= 0)
49                 close_nointr_nofail(b->fd);
50
51         free(b->rbuffer);
52         free(b->unique_name);
53         free(b->auth_uid);
54         free(b->address);
55
56         close_many(b->fds, b->n_fds);
57         free(b->fds);
58
59         for (i = 0; i < b->rqueue_size; i++)
60                 sd_bus_message_unref(b->rqueue[i]);
61         free(b->rqueue);
62
63         for (i = 0; i < b->wqueue_size; i++)
64                 sd_bus_message_unref(b->wqueue[i]);
65         free(b->wqueue);
66
67         hashmap_free_free(b->reply_callbacks);
68         prioq_free(b->reply_callbacks_prioq);
69
70         while ((f = b->filter_callbacks)) {
71                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
72                 free(f);
73         }
74
75         while ((c = hashmap_steal_first(b->object_callbacks))) {
76                 free(c->path);
77                 free(c);
78         }
79
80         hashmap_free(b->object_callbacks);
81
82         free(b);
83 }
84
85 int sd_bus_new(sd_bus **ret) {
86         sd_bus *r;
87
88         if (!ret)
89                 return -EINVAL;
90
91         r = new0(sd_bus, 1);
92         if (!r)
93                 return -ENOMEM;
94
95         r->n_ref = 1;
96         r->fd = -1;
97         r->message_version = 1;
98         r->negotiate_fds = true;
99
100         /* We guarantee that wqueue always has space for at least one
101          * entry */
102         r->wqueue = new(sd_bus_message*, 1);
103         if (!r->wqueue) {
104                 free(r);
105                 return -ENOMEM;
106         }
107
108         *ret = r;
109         return 0;
110 }
111
112 int sd_bus_set_address(sd_bus *bus, const char *address) {
113         char *a;
114
115         if (!bus)
116                 return -EINVAL;
117         if (bus->state != BUS_UNSET)
118                 return -EPERM;
119         if (!address)
120                 return -EINVAL;
121
122         a = strdup(address);
123         if (!a)
124                 return -ENOMEM;
125
126         free(bus->address);
127         bus->address = a;
128
129         return 0;
130 }
131
132 int sd_bus_set_fd(sd_bus *bus, int fd) {
133         if (!bus)
134                 return -EINVAL;
135         if (bus->state != BUS_UNSET)
136                 return -EPERM;
137         if (fd < 0)
138                 return -EINVAL;
139
140         bus->fd = fd;
141         return 0;
142 }
143
144 int sd_bus_set_hello(sd_bus *bus, int b) {
145         if (!bus)
146                 return -EINVAL;
147         if (bus->state != BUS_UNSET)
148                 return -EPERM;
149
150         bus->send_hello = !!b;
151         return 0;
152 }
153
154 int sd_bus_set_negotiate_fds(sd_bus *bus, int b) {
155         if (!bus)
156                 return -EINVAL;
157         if (bus->state != BUS_UNSET)
158                 return -EPERM;
159
160         bus->negotiate_fds = !!b;
161         return 0;
162 }
163
164 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
165         const char *s;
166         int r;
167
168         assert(bus);
169         assert(bus->state == BUS_HELLO);
170
171         if (error != 0)
172                 return -error;
173
174         assert(reply);
175
176         r = sd_bus_message_read(reply, "s", &s);
177         if (r < 0)
178                 return r;
179
180         if (!service_name_is_valid(s) || s[0] != ':')
181                 return -EBADMSG;
182
183         bus->unique_name = strdup(s);
184         if (!bus->unique_name)
185                 return -ENOMEM;
186
187         bus->state = BUS_RUNNING;
188
189         return 1;
190 }
191
192 static int bus_send_hello(sd_bus *bus) {
193         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
194         int r;
195
196         assert(bus);
197
198         if (!bus->send_hello)
199                 return 0;
200
201         r = sd_bus_message_new_method_call(
202                         bus,
203                         "org.freedesktop.DBus",
204                         "/",
205                         "org.freedesktop.DBus",
206                         "Hello",
207                         &m);
208         if (r < 0)
209                 return r;
210
211         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
212         if (r < 0)
213                 return r;
214
215         return r;
216 }
217
218 static int bus_start_running(sd_bus *bus) {
219         assert(bus);
220
221         if (bus->send_hello) {
222                 bus->state = BUS_HELLO;
223                 return 1;
224         }
225
226         bus->state = BUS_RUNNING;
227         return 1;
228 }
229
230 static int parse_address_key(const char **p, const char *key, char **value) {
231         size_t l, n = 0;
232         const char *a;
233         char *r = NULL;
234
235         assert(p);
236         assert(*p);
237         assert(key);
238         assert(value);
239
240         l = strlen(key);
241         if (strncmp(*p, key, l) != 0)
242                 return 0;
243
244         if ((*p)[l] != '=')
245                 return 0;
246
247         if (*value)
248                 return -EINVAL;
249
250         a = *p + l + 1;
251         while (*a != ',' && *a != 0) {
252                 char c, *t;
253
254                 if (*a == '%') {
255                         int x, y;
256
257                         x = unhexchar(a[1]);
258                         if (x < 0) {
259                                 free(r);
260                                 return x;
261                         }
262
263                         y = unhexchar(a[2]);
264                         if (y < 0) {
265                                 free(r);
266                                 return y;
267                         }
268
269                         c = (char) ((x << 4) | y);
270                         a += 3;
271                 } else {
272                         c = *a;
273                         a++;
274                 }
275
276                 t = realloc(r, n + 2);
277                 if (!t) {
278                         free(r);
279                         return -ENOMEM;
280                 }
281
282                 r = t;
283                 r[n++] = c;
284         }
285
286         if (!r) {
287                 r = strdup("");
288                 if (!r)
289                         return -ENOMEM;
290         } else
291                 r[n] = 0;
292
293         if (*a == ',')
294                 a++;
295
296         *p = a;
297         *value = r;
298         return 1;
299 }
300
301 static void skip_address_key(const char **p) {
302         assert(p);
303         assert(*p);
304
305         *p += strcspn(*p, ",");
306
307         if (**p == ',')
308                 (*p) ++;
309 }
310
311 static int bus_parse_next_address(sd_bus *b) {
312         const char *a, *p;
313         _cleanup_free_ char *guid = NULL;
314         int r;
315
316         assert(b);
317
318         if (!b->address)
319                 return 0;
320         if (b->address[b->address_index] == 0)
321                 return 0;
322
323         a = b->address + b->address_index;
324
325         zero(b->sockaddr);
326         b->sockaddr_size = 0;
327         b->peer = SD_ID128_NULL;
328
329         if (startswith(a, "unix:")) {
330                 _cleanup_free_ char *path = NULL, *abstract = NULL;
331
332                 p = a + 5;
333                 while (*p != 0) {
334                         r = parse_address_key(&p, "guid", &guid);
335                         if (r < 0)
336                                 return r;
337                         else if (r > 0)
338                                 continue;
339
340                         r = parse_address_key(&p, "path", &path);
341                         if (r < 0)
342                                 return r;
343                         else if (r > 0)
344                                 continue;
345
346                         r = parse_address_key(&p, "abstract", &abstract);
347                         if (r < 0)
348                                 return r;
349                         else if (r > 0)
350                                 continue;
351
352                         skip_address_key(&p);
353                 }
354
355                 if (!path && !abstract)
356                         return -EINVAL;
357
358                 if (path && abstract)
359                         return -EINVAL;
360
361                 if (path) {
362                         size_t l;
363
364                         l = strlen(path);
365                         if (l > sizeof(b->sockaddr.un.sun_path))
366                                 return -E2BIG;
367
368                         b->sockaddr.un.sun_family = AF_UNIX;
369                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
370                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
371                 } else if (abstract) {
372                         size_t l;
373
374                         l = strlen(abstract);
375                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
376                                 return -E2BIG;
377
378                         b->sockaddr.un.sun_family = AF_UNIX;
379                         b->sockaddr.un.sun_path[0] = 0;
380                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
381                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
382                 }
383
384         } else if (startswith(a, "tcp:")) {
385                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
386                 struct addrinfo hints, *result;
387
388                 p = a + 4;
389                 while (*p != 0) {
390                         r = parse_address_key(&p, "guid", &guid);
391                         if (r < 0)
392                                 return r;
393                         else if (r > 0)
394                                 continue;
395
396                         r = parse_address_key(&p, "host", &host);
397                         if (r < 0)
398                                 return r;
399                         else if (r > 0)
400                                 continue;
401
402                         r = parse_address_key(&p, "port", &port);
403                         if (r < 0)
404                                 return r;
405                         else if (r > 0)
406                                 continue;
407
408                         r = parse_address_key(&p, "family", &family);
409                         if (r < 0)
410                                 return r;
411                         else if (r > 0)
412                                 continue;
413
414                         skip_address_key(&p);
415                 }
416
417                 if (!host || !port)
418                         return -EINVAL;
419
420                 zero(hints);
421                 hints.ai_socktype = SOCK_STREAM;
422                 hints.ai_flags = AI_ADDRCONFIG;
423
424                 if (family) {
425                         if (streq(family, "ipv4"))
426                                 hints.ai_family = AF_INET;
427                         else if (streq(family, "ipv6"))
428                                 hints.ai_family = AF_INET6;
429                         else
430                                 return -EINVAL;
431                 }
432
433                 r = getaddrinfo(host, port, &hints, &result);
434                 if (r == EAI_SYSTEM)
435                         return -errno;
436                 else if (r != 0)
437                         return -EADDRNOTAVAIL;
438
439                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
440                 b->sockaddr_size = result->ai_addrlen;
441
442                 freeaddrinfo(result);
443         }
444
445         if (guid) {
446                 r = sd_id128_from_string(guid, &b->peer);
447                 if (r < 0)
448                         return r;
449         }
450
451         b->address_index = p - b->address;
452         return 1;
453 }
454
455 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
456
457         while (size > 0) {
458                 struct iovec *i = iov + *idx;
459
460                 if (i->iov_len > size) {
461                         i->iov_base = (uint8_t*) i->iov_base + size;
462                         i->iov_len -= size;
463                         return;
464                 }
465
466                 size -= i->iov_len;
467
468                 i->iov_base = NULL;
469                 i->iov_len = 0;
470
471                 (*idx) ++;
472         }
473 }
474
475 static int bus_write_auth(sd_bus *b) {
476         struct msghdr mh;
477         ssize_t k;
478
479         assert(b);
480         assert(b->state == BUS_AUTHENTICATING);
481
482         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
483                 return 0;
484
485         if (b->auth_timeout == 0)
486                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
487
488         zero(mh);
489         mh.msg_iov = b->auth_iovec + b->auth_index;
490         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
491
492         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
493         if (k < 0)
494                 return errno == EAGAIN ? 0 : -errno;
495
496         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
497
498         return 1;
499 }
500
501 static int bus_auth_verify(sd_bus *b) {
502         char *e, *f, *start;
503         sd_id128_t peer;
504         unsigned i;
505         int r;
506
507         /* We expect two response lines: "OK" and possibly
508          * "AGREE_UNIX_FD" */
509
510         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
511         if (!e)
512                 return 0;
513
514         if (b->negotiate_fds) {
515                 f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
516                 if (!f)
517                         return 0;
518
519                 start = f + 2;
520         } else {
521                 f = NULL;
522                 start = e + 2;
523         }
524
525         /* Nice! We got all the lines we need. First check the OK
526          * line */
527
528         if (e - (char*) b->rbuffer != 3 + 32)
529                 return -EPERM;
530
531         if (memcmp(b->rbuffer, "OK ", 3))
532                 return -EPERM;
533
534         for (i = 0; i < 32; i += 2) {
535                 int x, y;
536
537                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
538                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
539
540                 if (x < 0 || y < 0)
541                         return -EINVAL;
542
543                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
544         }
545
546         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
547             !sd_id128_equal(b->peer, peer))
548                 return -EPERM;
549
550         b->peer = peer;
551
552         /* And possibly check the second line, too */
553
554         if (f)
555                 b->can_fds =
556                         (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
557                         memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
558
559         b->rbuffer_size -= (start - (char*) b->rbuffer);
560         memmove(b->rbuffer, start, b->rbuffer_size);
561
562         r = bus_start_running(b);
563         if (r < 0)
564                 return r;
565
566         return 1;
567 }
568
569 static int bus_read_auth(sd_bus *b) {
570         struct msghdr mh;
571         struct iovec iov;
572         size_t n;
573         ssize_t k;
574         int r;
575         void *p;
576
577         assert(b);
578
579         r = bus_auth_verify(b);
580         if (r != 0)
581                 return r;
582
583         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
584
585         if (n > BUS_AUTH_SIZE_MAX)
586                 n = BUS_AUTH_SIZE_MAX;
587
588         if (b->rbuffer_size >= n)
589                 return -ENOBUFS;
590
591         p = realloc(b->rbuffer, n);
592         if (!p)
593                 return -ENOMEM;
594
595         b->rbuffer = p;
596
597         zero(iov);
598         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
599         iov.iov_len = n - b->rbuffer_size;
600
601         zero(mh);
602         mh.msg_iov = &iov;
603         mh.msg_iovlen = 1;
604
605         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
606         if (k < 0)
607                 return errno == EAGAIN ? 0 : -errno;
608         if (k == 0)
609                 return -ECONNRESET;
610
611         b->rbuffer_size += k;
612
613         r = bus_auth_verify(b);
614         if (r != 0)
615                 return r;
616
617         return 1;
618 }
619
620 static int bus_setup_fd(sd_bus *b) {
621         int one;
622
623         assert(b);
624
625         /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
626          * just in case. This is actually irrelavant for */
627         one = 1;
628         setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
629         setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
630
631         /* Increase the buffers to a MB */
632         fd_inc_rcvbuf(b->fd, 1024*1024);
633         fd_inc_sndbuf(b->fd, 1024*1024);
634
635         return 0;
636 }
637
638 static int bus_start_auth(sd_bus *b) {
639         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
640         static const char auth_suffix_with_unix_fd[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
641         static const char auth_suffix_without_unix_fd[] = "\r\nBEGIN\r\n";
642
643         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
644         size_t l;
645         const char *auth_suffix;
646
647         assert(b);
648
649         b->state = BUS_AUTHENTICATING;
650
651         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
652         char_array_0(text);
653
654         l = strlen(text);
655         b->auth_uid = hexmem(text, l);
656         if (!b->auth_uid)
657                 return -ENOMEM;
658
659         auth_suffix = b->negotiate_fds ? auth_suffix_with_unix_fd : auth_suffix_without_unix_fd;
660
661         b->auth_iovec[0].iov_base = (void*) auth_prefix;
662         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
663         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
664         b->auth_iovec[1].iov_len = l * 2;
665         b->auth_iovec[2].iov_base = (void*) auth_suffix;
666         b->auth_iovec[2].iov_len = strlen(auth_suffix);
667         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
668
669         return bus_write_auth(b);
670 }
671
672 static int bus_start_connect(sd_bus *b) {
673         int r;
674
675         assert(b);
676         assert(b->fd < 0);
677
678         for (;;) {
679                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
680                         r = bus_parse_next_address(b);
681                         if (r < 0)
682                                 return r;
683                         if (r == 0)
684                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
685                 }
686
687                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
688                 if (b->fd < 0) {
689                         b->last_connect_error = errno;
690                         goto try_again;
691                 }
692
693                 r = bus_setup_fd(b);
694                 if (r < 0) {
695                         b->last_connect_error = errno;
696                         goto try_again;
697                 }
698
699                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
700                 if (r < 0) {
701                         if (errno == EINPROGRESS)
702                                 return 1;
703
704                         b->last_connect_error = errno;
705                         goto try_again;
706                 }
707
708                 return bus_start_auth(b);
709
710         try_again:
711                 zero(b->sockaddr);
712
713                 if (b->fd >= 0) {
714                         close_nointr_nofail(b->fd);
715                         b->fd = -1;
716                 }
717         }
718 }
719
720 static int bus_start_fd(sd_bus *b) {
721         int r;
722
723         assert(b);
724
725         r = fd_nonblock(b->fd, true);
726         if (r < 0)
727                 return r;
728
729         r = fd_cloexec(b->fd, true);
730         if (r < 0)
731                 return r;
732
733         r = bus_setup_fd(b);
734         if (r < 0)
735                 return r;
736
737         return bus_start_auth(b);
738 }
739
740 int sd_bus_start(sd_bus *bus) {
741         int r;
742
743         if (!bus)
744                 return -EINVAL;
745         if (bus->state != BUS_UNSET)
746                 return -EPERM;
747
748         bus->state = BUS_OPENING;
749
750         if (bus->fd >= 0)
751                 r = bus_start_fd(bus);
752         else if (bus->address)
753                 r = bus_start_connect(bus);
754         else
755                 return -EINVAL;
756
757         if (r < 0)
758                 return r;
759
760         return bus_send_hello(bus);
761 }
762
763 int sd_bus_open_system(sd_bus **ret) {
764         const char *e;
765         sd_bus *b;
766         int r;
767
768         if (!ret)
769                 return -EINVAL;
770
771         r = sd_bus_new(&b);
772         if (r < 0)
773                 return r;
774
775         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
776         if (e) {
777                 r = sd_bus_set_address(b, e);
778                 if (r < 0)
779                         goto fail;
780         } else {
781                 b->sockaddr.un.sun_family = AF_UNIX;
782                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
783                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
784         }
785
786         b->send_hello = true;
787
788         r = sd_bus_start(b);
789         if (r < 0)
790                 goto fail;
791
792         *ret = b;
793         return 0;
794
795 fail:
796         bus_free(b);
797         return r;
798 }
799
800 int sd_bus_open_user(sd_bus **ret) {
801         const char *e;
802         sd_bus *b;
803         size_t l;
804         int r;
805
806         if (!ret)
807                 return -EINVAL;
808
809         r = sd_bus_new(&b);
810         if (r < 0)
811                 return r;
812
813         e = getenv("DBUS_SESSION_BUS_ADDRESS");
814         if (e) {
815                 r = sd_bus_set_address(b, e);
816                 if (r < 0)
817                         goto fail;
818         } else {
819                 e = getenv("XDG_RUNTIME_DIR");
820                 if (!e) {
821                         r = -ENOENT;
822                         goto fail;
823                 }
824
825                 l = strlen(e);
826                 if (l + 4 > sizeof(b->sockaddr.un.sun_path)) {
827                         r = -E2BIG;
828                         goto fail;
829                 }
830
831                 b->sockaddr.un.sun_family = AF_UNIX;
832                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
833                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
834         }
835
836         b->send_hello = true;
837
838         r = sd_bus_start(b);
839         if (r < 0)
840                 goto fail;
841
842         *ret = b;
843         return 0;
844
845 fail:
846         bus_free(b);
847         return r;
848 }
849
850 void sd_bus_close(sd_bus *bus) {
851         if (!bus)
852                 return;
853         if (bus->fd < 0)
854                 return;
855
856         close_nointr_nofail(bus->fd);
857         bus->fd = -1;
858 }
859
860 sd_bus *sd_bus_ref(sd_bus *bus) {
861         if (!bus)
862                 return NULL;
863
864         assert(bus->n_ref > 0);
865
866         bus->n_ref++;
867         return bus;
868 }
869
870 sd_bus *sd_bus_unref(sd_bus *bus) {
871         if (!bus)
872                 return NULL;
873
874         assert(bus->n_ref > 0);
875         bus->n_ref--;
876
877         if (bus->n_ref <= 0)
878                 bus_free(bus);
879
880         return NULL;
881 }
882
883 int sd_bus_is_open(sd_bus *bus) {
884         if (!bus)
885                 return -EINVAL;
886
887         return bus->state != BUS_UNSET && bus->fd >= 0;
888 }
889
890 int sd_bus_can_send(sd_bus *bus, char type) {
891         int r;
892
893         if (!bus)
894                 return -EINVAL;
895         if (bus->fd < 0)
896                 return -ENOTCONN;
897
898         if (type == SD_BUS_TYPE_UNIX_FD) {
899                 if (!bus->negotiate_fds)
900                         return 0;
901
902                 r = bus_ensure_running(bus);
903                 if (r < 0)
904                         return r;
905
906                 return bus->can_fds;
907         }
908
909         return bus_type_is_valid(type);
910 }
911
912 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
913         int r;
914
915         if (!bus)
916                 return -EINVAL;
917         if (!peer)
918                 return -EINVAL;
919
920         r = bus_ensure_running(bus);
921         if (r < 0)
922                 return r;
923
924         *peer = bus->peer;
925         return 0;
926 }
927
928 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
929         assert(m);
930
931         if (m->header->version > b->message_version)
932                 return -EPERM;
933
934         if (m->sealed)
935                 return 0;
936
937         return bus_message_seal(m, ++b->serial);
938 }
939
940 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
941         struct msghdr mh;
942         struct iovec *iov;
943         ssize_t k;
944         size_t n;
945         unsigned j;
946
947         assert(bus);
948         assert(m);
949         assert(idx);
950         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
951
952         if (*idx >= m->size)
953                 return 0;
954         zero(mh);
955
956         if (m->n_fds > 0) {
957                 struct cmsghdr *control;
958                 control = alloca(CMSG_SPACE(sizeof(int) * m->n_fds));
959
960                 mh.msg_control = control;
961                 control->cmsg_level = SOL_SOCKET;
962                 control->cmsg_type = SCM_RIGHTS;
963                 mh.msg_controllen = control->cmsg_len = CMSG_LEN(sizeof(int) * m->n_fds);
964                 memcpy(CMSG_DATA(control), m->fds, sizeof(int) * m->n_fds);
965         }
966
967         n = m->n_iovec * sizeof(struct iovec);
968         iov = alloca(n);
969         memcpy(iov, m->iovec, n);
970
971         j = 0;
972         iovec_advance(iov, &j, *idx);
973
974         mh.msg_iov = iov;
975         mh.msg_iovlen = m->n_iovec;
976
977         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
978         if (k < 0)
979                 return errno == EAGAIN ? 0 : -errno;
980
981         *idx += (size_t) k;
982         return 1;
983 }
984
985 static int message_read_need(sd_bus *bus, size_t *need) {
986         uint32_t a, b;
987         uint8_t e;
988         uint64_t sum;
989
990         assert(bus);
991         assert(need);
992         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
993
994         if (bus->rbuffer_size < sizeof(struct bus_header)) {
995                 *need = sizeof(struct bus_header) + 8;
996
997                 /* Minimum message size:
998                  *
999                  * Header +
1000                  *
1001                  *  Method Call: +2 string headers
1002                  *       Signal: +3 string headers
1003                  * Method Error: +1 string headers
1004                  *               +1 uint32 headers
1005                  * Method Reply: +1 uint32 headers
1006                  *
1007                  * A string header is at least 9 bytes
1008                  * A uint32 header is at least 8 bytes
1009                  *
1010                  * Hence the minimum message size of a valid message
1011                  * is header + 8 bytes */
1012
1013                 return 0;
1014         }
1015
1016         a = ((const uint32_t*) bus->rbuffer)[1];
1017         b = ((const uint32_t*) bus->rbuffer)[3];
1018
1019         e = ((const uint8_t*) bus->rbuffer)[0];
1020         if (e == SD_BUS_LITTLE_ENDIAN) {
1021                 a = le32toh(a);
1022                 b = le32toh(b);
1023         } else if (e == SD_BUS_BIG_ENDIAN) {
1024                 a = be32toh(a);
1025                 b = be32toh(b);
1026         } else
1027                 return -EBADMSG;
1028
1029         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
1030         if (sum >= BUS_MESSAGE_SIZE_MAX)
1031                 return -ENOBUFS;
1032
1033         *need = (size_t) sum;
1034         return 0;
1035 }
1036
1037 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
1038         sd_bus_message *t;
1039         void *b;
1040         int r;
1041
1042         assert(bus);
1043         assert(m);
1044         assert(bus->rbuffer_size >= size);
1045         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1046
1047         if (bus->rbuffer_size > size) {
1048                 b = memdup((const uint8_t*) bus->rbuffer + size,
1049                            bus->rbuffer_size - size);
1050                 if (!b)
1051                         return -ENOMEM;
1052         } else
1053                 b = NULL;
1054
1055         r = bus_message_from_malloc(bus->rbuffer, size,
1056                                     bus->fds, bus->n_fds,
1057                                     bus->ucred_valid ? &bus->ucred : NULL,
1058                                     bus->label[0] ? bus->label : NULL,
1059                                     &t);
1060         if (r < 0) {
1061                 free(b);
1062                 return r;
1063         }
1064
1065         bus->rbuffer = b;
1066         bus->rbuffer_size -= size;
1067
1068         bus->fds = NULL;
1069         bus->n_fds = 0;
1070
1071         *m = t;
1072         return 1;
1073 }
1074
1075 static int message_read(sd_bus *bus, sd_bus_message **m) {
1076         struct msghdr mh;
1077         struct iovec iov;
1078         ssize_t k;
1079         size_t need;
1080         int r;
1081         void *b;
1082         union {
1083                 struct cmsghdr cmsghdr;
1084                 uint8_t buf[CMSG_SPACE(sizeof(int) * BUS_FDS_MAX) +
1085                             CMSG_SPACE(sizeof(struct ucred)) +
1086                             CMSG_SPACE(NAME_MAX)]; /*selinux label */
1087         } control;
1088         struct cmsghdr *cmsg;
1089
1090         assert(bus);
1091         assert(m);
1092         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1093
1094         r = message_read_need(bus, &need);
1095         if (r < 0)
1096                 return r;
1097
1098         if (bus->rbuffer_size >= need)
1099                 return message_make(bus, need, m);
1100
1101         b = realloc(bus->rbuffer, need);
1102         if (!b)
1103                 return -ENOMEM;
1104
1105         bus->rbuffer = b;
1106
1107         zero(iov);
1108         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1109         iov.iov_len = need - bus->rbuffer_size;
1110
1111         zero(mh);
1112         mh.msg_iov = &iov;
1113         mh.msg_iovlen = 1;
1114         mh.msg_control = &control;
1115         mh.msg_controllen = sizeof(control);
1116
1117         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1118         if (k < 0)
1119                 return errno == EAGAIN ? 0 : -errno;
1120         if (k == 0)
1121                 return -ECONNRESET;
1122
1123         bus->rbuffer_size += k;
1124
1125         for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1126                 if (cmsg->cmsg_level == SOL_SOCKET &&
1127                     cmsg->cmsg_type == SCM_RIGHTS) {
1128                         int n, *f;
1129
1130                         n = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
1131
1132                         f = realloc(bus->fds, sizeof(int) + (bus->n_fds + n));
1133                         if (!f) {
1134                                 close_many((int*) CMSG_DATA(cmsg), n);
1135                                 return -ENOMEM;
1136                         }
1137
1138                         memcpy(f + bus->n_fds, CMSG_DATA(cmsg), n * sizeof(int));
1139                         bus->fds = f;
1140                         bus->n_fds += n;
1141                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1142                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1143                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1144
1145                         memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1146                         bus->ucred_valid = true;
1147
1148                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1149                          cmsg->cmsg_type == SCM_SECURITY) {
1150
1151                         size_t l;
1152                         l = cmsg->cmsg_len - CMSG_LEN(0);
1153                         memcpy(&bus->label, CMSG_DATA(cmsg), l);
1154                         bus->label[l] = 0;
1155                 }
1156         }
1157
1158         r = message_read_need(bus, &need);
1159         if (r < 0)
1160                 return r;
1161
1162         if (bus->rbuffer_size >= need)
1163                 return message_make(bus, need, m);
1164
1165         return 1;
1166 }
1167
1168 static int dispatch_wqueue(sd_bus *bus) {
1169         int r, ret = 0;
1170
1171         assert(bus);
1172         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1173
1174         if (bus->fd < 0)
1175                 return -ENOTCONN;
1176
1177         while (bus->wqueue_size > 0) {
1178
1179                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1180                 if (r < 0) {
1181                         sd_bus_close(bus);
1182                         return r;
1183                 } else if (r == 0)
1184                         /* Didn't do anything this time */
1185                         return ret;
1186                 else if (bus->windex >= bus->wqueue[0]->size) {
1187                         /* Fully written. Let's drop the entry from
1188                          * the queue.
1189                          *
1190                          * This isn't particularly optimized, but
1191                          * well, this is supposed to be our worst-case
1192                          * buffer only, and the socket buffer is
1193                          * supposed to be our primary buffer, and if
1194                          * it got full, then all bets are off
1195                          * anyway. */
1196
1197                         sd_bus_message_unref(bus->wqueue[0]);
1198                         bus->wqueue_size --;
1199                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1200                         bus->windex = 0;
1201
1202                         ret = 1;
1203                 }
1204         }
1205
1206         return ret;
1207 }
1208
1209 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1210         sd_bus_message *z = NULL;
1211         int r, ret = 0;
1212
1213         assert(bus);
1214         assert(m);
1215         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1216
1217         if (bus->fd < 0)
1218                 return -ENOTCONN;
1219
1220         if (bus->rqueue_size > 0) {
1221                 /* Dispatch a queued message */
1222
1223                 *m = bus->rqueue[0];
1224                 bus->rqueue_size --;
1225                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1226                 return 1;
1227         }
1228
1229         /* Try to read a new message */
1230         do {
1231                 r = message_read(bus, &z);
1232                 if (r < 0) {
1233                         sd_bus_close(bus);
1234                         return r;
1235                 }
1236                 if (r == 0)
1237                         return ret;
1238
1239                 r = 1;
1240         } while (!z);
1241
1242         *m = z;
1243         return 1;
1244 }
1245
1246 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1247         int r;
1248
1249         if (!bus)
1250                 return -EINVAL;
1251         if (bus->state == BUS_UNSET)
1252                 return -ENOTCONN;
1253         if (bus->fd < 0)
1254                 return -ENOTCONN;
1255         if (!m)
1256                 return -EINVAL;
1257
1258         if (m->n_fds > 0) {
1259                 r = sd_bus_can_send(bus, SD_BUS_TYPE_UNIX_FD);
1260                 if (r < 0)
1261                         return r;
1262                 if (r == 0)
1263                         return -ENOTSUP;
1264         }
1265
1266         /* If the serial number isn't kept, then we know that no reply
1267          * is expected */
1268         if (!serial && !m->sealed)
1269                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1270
1271         r = bus_seal_message(bus, m);
1272         if (r < 0)
1273                 return r;
1274
1275         /* If this is a reply and no reply was requested, then let's
1276          * suppress this, if we can */
1277         if (m->dont_send && !serial)
1278                 return 0;
1279
1280         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1281                 size_t idx = 0;
1282
1283                 r = message_write(bus, m, &idx);
1284                 if (r < 0) {
1285                         sd_bus_close(bus);
1286                         return r;
1287                 } else if (idx < m->size)  {
1288                         /* Wasn't fully written. So let's remember how
1289                          * much was written. Note that the first entry
1290                          * of the wqueue array is always allocated so
1291                          * that we always can remember how much was
1292                          * written. */
1293                         bus->wqueue[0] = sd_bus_message_ref(m);
1294                         bus->wqueue_size = 1;
1295                         bus->windex = idx;
1296                 }
1297         } else {
1298                 sd_bus_message **q;
1299
1300                 /* Just append it to the queue. */
1301
1302                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1303                         return -ENOBUFS;
1304
1305                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1306                 if (!q)
1307                         return -ENOMEM;
1308
1309                 bus->wqueue = q;
1310                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1311         }
1312
1313         if (serial)
1314                 *serial = BUS_MESSAGE_SERIAL(m);
1315
1316         return 0;
1317 }
1318
1319 static usec_t calc_elapse(uint64_t usec) {
1320         if (usec == (uint64_t) -1)
1321                 return 0;
1322
1323         if (usec == 0)
1324                 usec = BUS_DEFAULT_TIMEOUT;
1325
1326         return now(CLOCK_MONOTONIC) + usec;
1327 }
1328
1329 static int timeout_compare(const void *a, const void *b) {
1330         const struct reply_callback *x = a, *y = b;
1331
1332         if (x->timeout != 0 && y->timeout == 0)
1333                 return -1;
1334
1335         if (x->timeout == 0 && y->timeout != 0)
1336                 return 1;
1337
1338         if (x->timeout < y->timeout)
1339                 return -1;
1340
1341         if (x->timeout > y->timeout)
1342                 return 1;
1343
1344         return 0;
1345 }
1346
1347 int sd_bus_send_with_reply(
1348                 sd_bus *bus,
1349                 sd_bus_message *m,
1350                 sd_message_handler_t callback,
1351                 void *userdata,
1352                 uint64_t usec,
1353                 uint64_t *serial) {
1354
1355         struct reply_callback *c;
1356         int r;
1357
1358         if (!bus)
1359                 return -EINVAL;
1360         if (bus->state == BUS_UNSET)
1361                 return -ENOTCONN;
1362         if (bus->fd < 0)
1363                 return -ENOTCONN;
1364         if (!m)
1365                 return -EINVAL;
1366         if (!callback)
1367                 return -EINVAL;
1368         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1369                 return -EINVAL;
1370         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1371                 return -EINVAL;
1372
1373         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1374         if (r < 0)
1375                 return r;
1376
1377         if (usec != (uint64_t) -1) {
1378                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1379                 if (r < 0)
1380                         return r;
1381         }
1382
1383         r = bus_seal_message(bus, m);
1384         if (r < 0)
1385                 return r;
1386
1387         c = new(struct reply_callback, 1);
1388         if (!c)
1389                 return -ENOMEM;
1390
1391         c->callback = callback;
1392         c->userdata = userdata;
1393         c->serial = BUS_MESSAGE_SERIAL(m);
1394         c->timeout = calc_elapse(usec);
1395
1396         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1397         if (r < 0) {
1398                 free(c);
1399                 return r;
1400         }
1401
1402         if (c->timeout != 0) {
1403                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1404                 if (r < 0) {
1405                         c->timeout = 0;
1406                         sd_bus_send_with_reply_cancel(bus, c->serial);
1407                         return r;
1408                 }
1409         }
1410
1411         r = sd_bus_send(bus, m, serial);
1412         if (r < 0) {
1413                 sd_bus_send_with_reply_cancel(bus, c->serial);
1414                 return r;
1415         }
1416
1417         return r;
1418 }
1419
1420 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1421         struct reply_callback *c;
1422
1423         if (!bus)
1424                 return -EINVAL;
1425         if (serial == 0)
1426                 return -EINVAL;
1427
1428         c = hashmap_remove(bus->reply_callbacks, &serial);
1429         if (!c)
1430                 return 0;
1431
1432         if (c->timeout != 0)
1433                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1434
1435         free(c);
1436         return 1;
1437 }
1438
1439 int bus_ensure_running(sd_bus *bus) {
1440         int r;
1441
1442         assert(bus);
1443
1444         if (bus->fd < 0)
1445                 return -ENOTCONN;
1446         if (bus->state == BUS_UNSET)
1447                 return -ENOTCONN;
1448
1449         if (bus->state == BUS_RUNNING)
1450                 return 1;
1451
1452         for (;;) {
1453                 r = sd_bus_process(bus, NULL);
1454                 if (r < 0)
1455                         return r;
1456                 if (bus->state == BUS_RUNNING)
1457                         return 1;
1458                 if (r > 0)
1459                         continue;
1460
1461                 r = sd_bus_wait(bus, (uint64_t) -1);
1462                 if (r < 0)
1463                         return r;
1464         }
1465 }
1466
1467 int sd_bus_send_with_reply_and_block(
1468                 sd_bus *bus,
1469                 sd_bus_message *m,
1470                 uint64_t usec,
1471                 sd_bus_error *error,
1472                 sd_bus_message **reply) {
1473
1474         int r;
1475         usec_t timeout;
1476         uint64_t serial;
1477         bool room = false;
1478
1479         if (!bus)
1480                 return -EINVAL;
1481         if (bus->fd < 0)
1482                 return -ENOTCONN;
1483         if (bus->state == BUS_UNSET)
1484                 return -ENOTCONN;
1485         if (!m)
1486                 return -EINVAL;
1487         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1488                 return -EINVAL;
1489         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1490                 return -EINVAL;
1491         if (bus_error_is_dirty(error))
1492                 return -EINVAL;
1493
1494         r = bus_ensure_running(bus);
1495         if (r < 0)
1496                 return r;
1497
1498         r = sd_bus_send(bus, m, &serial);
1499         if (r < 0)
1500                 return r;
1501
1502         timeout = calc_elapse(usec);
1503
1504         for (;;) {
1505                 usec_t left;
1506                 sd_bus_message *incoming = NULL;
1507
1508                 if (!room) {
1509                         sd_bus_message **q;
1510
1511                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1512                                 return -ENOBUFS;
1513
1514                         /* Make sure there's room for queuing this
1515                          * locally, before we read the message */
1516
1517                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1518                         if (!q)
1519                                 return -ENOMEM;
1520
1521                         bus->rqueue = q;
1522                         room = true;
1523                 }
1524
1525                 r = message_read(bus, &incoming);
1526                 if (r < 0)
1527                         return r;
1528                 if (incoming) {
1529
1530                         if (incoming->reply_serial == serial) {
1531                                 /* Found a match! */
1532
1533                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1534                                         *reply = incoming;
1535                                         return 0;
1536                                 }
1537
1538                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1539                                         int k;
1540
1541                                         r = sd_bus_error_copy(error, &incoming->error);
1542                                         if (r < 0) {
1543                                                 sd_bus_message_unref(incoming);
1544                                                 return r;
1545                                         }
1546
1547                                         k = bus_error_to_errno(&incoming->error);
1548                                         sd_bus_message_unref(incoming);
1549                                         return k;
1550                                 }
1551
1552                                 sd_bus_message_unref(incoming);
1553                                 return -EIO;
1554                         }
1555
1556                         /* There's already guaranteed to be room for
1557                          * this, so need to resize things here */
1558                         bus->rqueue[bus->rqueue_size ++] = incoming;
1559                         room = false;
1560
1561                         /* Try to read more, right-away */
1562                         continue;
1563                 }
1564                 if (r != 0)
1565                         continue;
1566
1567                 if (timeout > 0) {
1568                         usec_t n;
1569
1570                         n = now(CLOCK_MONOTONIC);
1571                         if (n >= timeout)
1572                                 return -ETIMEDOUT;
1573
1574                         left = timeout - n;
1575                 } else
1576                         left = (uint64_t) -1;
1577
1578                 r = bus_poll(bus, true, left);
1579                 if (r < 0)
1580                         return r;
1581
1582                 r = dispatch_wqueue(bus);
1583                 if (r < 0)
1584                         return r;
1585         }
1586 }
1587
1588 int sd_bus_get_fd(sd_bus *bus) {
1589         if (!bus)
1590                 return -EINVAL;
1591
1592         if (bus->fd < 0)
1593                 return -ENOTCONN;
1594
1595         return bus->fd;
1596 }
1597
1598 int sd_bus_get_events(sd_bus *bus) {
1599         int flags = 0;
1600
1601         if (!bus)
1602                 return -EINVAL;
1603         if (bus->state == BUS_UNSET)
1604                 return -ENOTCONN;
1605         if (bus->fd < 0)
1606                 return -ENOTCONN;
1607
1608         if (bus->state == BUS_OPENING)
1609                 flags |= POLLOUT;
1610         else if (bus->state == BUS_AUTHENTICATING) {
1611
1612                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1613                         flags |= POLLOUT;
1614
1615                 flags |= POLLIN;
1616
1617         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1618                 if (bus->rqueue_size <= 0)
1619                         flags |= POLLIN;
1620                 if (bus->wqueue_size > 0)
1621                         flags |= POLLOUT;
1622         }
1623
1624         return flags;
1625 }
1626
1627 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1628         struct reply_callback *c;
1629
1630         if (!bus)
1631                 return -EINVAL;
1632         if (!timeout_usec)
1633                 return -EINVAL;
1634         if (bus->state == BUS_UNSET)
1635                 return -ENOTCONN;
1636         if (bus->fd < 0)
1637                 return -ENOTCONN;
1638
1639         if (bus->state == BUS_AUTHENTICATING) {
1640                 *timeout_usec = bus->auth_timeout;
1641                 return 1;
1642         }
1643
1644         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1645                 return 0;
1646
1647         c = prioq_peek(bus->reply_callbacks_prioq);
1648         if (!c)
1649                 return 0;
1650
1651         *timeout_usec = c->timeout;
1652         return 1;
1653 }
1654
1655 static int process_timeout(sd_bus *bus) {
1656         struct reply_callback *c;
1657         usec_t n;
1658         int r;
1659
1660         assert(bus);
1661
1662         c = prioq_peek(bus->reply_callbacks_prioq);
1663         if (!c)
1664                 return 0;
1665
1666         n = now(CLOCK_MONOTONIC);
1667         if (c->timeout > n)
1668                 return 0;
1669
1670         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1671         hashmap_remove(bus->reply_callbacks, &c->serial);
1672
1673         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1674         free(c);
1675
1676         return r < 0 ? r : 1;
1677 }
1678
1679 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1680         struct reply_callback *c;
1681         int r;
1682
1683         assert(bus);
1684         assert(m);
1685
1686         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1687             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1688                 return 0;
1689
1690         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1691         if (!c)
1692                 return 0;
1693
1694         if (c->timeout != 0)
1695                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1696
1697         r = c->callback(bus, 0, m, c->userdata);
1698         free(c);
1699
1700         return r;
1701 }
1702
1703 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1704         struct filter_callback *l;
1705         int r;
1706
1707         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1708                 r = l->callback(bus, 0, m, l->userdata);
1709                 if (r != 0)
1710                         return r;
1711         }
1712
1713         return 0;
1714 }
1715
1716 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1717         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1718         int r;
1719
1720         assert(bus);
1721         assert(m);
1722
1723         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1724                 return 0;
1725
1726         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1727                 return 0;
1728
1729         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1730                 return 1;
1731
1732         if (streq_ptr(m->member, "Ping"))
1733                 r = sd_bus_message_new_method_return(bus, m, &reply);
1734         else if (streq_ptr(m->member, "GetMachineId")) {
1735                 sd_id128_t id;
1736                 char sid[33];
1737
1738                 r = sd_id128_get_machine(&id);
1739                 if (r < 0)
1740                         return r;
1741
1742                 r = sd_bus_message_new_method_return(bus, m, &reply);
1743                 if (r < 0)
1744                         return r;
1745
1746                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1747         } else {
1748                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1749
1750                 sd_bus_error_set(&error,
1751                                  "org.freedesktop.DBus.Error.UnknownMethod",
1752                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1753
1754                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1755         }
1756
1757         if (r < 0)
1758                 return r;
1759
1760         r = sd_bus_send(bus, reply, NULL);
1761         if (r < 0)
1762                 return r;
1763
1764         return 1;
1765 }
1766
1767 static int process_object(sd_bus *bus, sd_bus_message *m) {
1768         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1769         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1770         struct object_callback *c;
1771         char *p;
1772         int r;
1773         bool found = false;
1774
1775         assert(bus);
1776         assert(m);
1777
1778         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1779                 return 0;
1780
1781         if (hashmap_isempty(bus->object_callbacks))
1782                 return 0;
1783
1784         c = hashmap_get(bus->object_callbacks, m->path);
1785         if (c) {
1786                 r = c->callback(bus, 0, m, c->userdata);
1787                 if (r != 0)
1788                         return r;
1789
1790                 found = true;
1791         }
1792
1793         /* Look for fallback prefixes */
1794         p = strdupa(m->path);
1795         for (;;) {
1796                 char *e;
1797
1798                 e = strrchr(p, '/');
1799                 if (e == p || !e)
1800                         break;
1801
1802                 *e = 0;
1803
1804                 c = hashmap_get(bus->object_callbacks, p);
1805                 if (c && c->is_fallback) {
1806                         r = c->callback(bus, 0, m, c->userdata);
1807                         if (r != 0)
1808                                 return r;
1809
1810                         found = true;
1811                 }
1812         }
1813
1814         if (!found)
1815                 return 0;
1816
1817         sd_bus_error_set(&error,
1818                          "org.freedesktop.DBus.Error.UnknownMethod",
1819                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1820
1821         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1822         if (r < 0)
1823                 return r;
1824
1825         r = sd_bus_send(bus, reply, NULL);
1826         if (r < 0)
1827                 return r;
1828
1829         return 1;
1830 }
1831
1832 static int process_message(sd_bus *bus, sd_bus_message *m) {
1833         int r;
1834
1835         assert(bus);
1836         assert(m);
1837
1838         r = process_reply(bus, m);
1839         if (r != 0)
1840                 return r;
1841
1842         r = process_filter(bus, m);
1843         if (r != 0)
1844                 return r;
1845
1846         r = process_builtin(bus, m);
1847         if (r != 0)
1848                 return r;
1849
1850         return process_object(bus, m);
1851 }
1852
1853 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1854         int r;
1855
1856         /* Returns 0 when we didn't do anything. This should cause the
1857          * caller to invoke sd_bus_wait() before returning the next
1858          * time. Returns > 0 when we did something, which possibly
1859          * means *ret is filled in with an unprocessed message. */
1860
1861         if (!bus)
1862                 return -EINVAL;
1863         if (bus->state == BUS_UNSET)
1864                 return -ENOTCONN;
1865         if (bus->fd < 0)
1866                 return -ENOTCONN;
1867
1868         if (bus->state == BUS_OPENING) {
1869                 struct pollfd p;
1870
1871                 zero(p);
1872                 p.fd = bus->fd;
1873                 p.events = POLLOUT;
1874
1875                 r = poll(&p, 1, 0);
1876                 if (r < 0)
1877                         return -errno;
1878
1879                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1880                         int error = 0;
1881                         socklen_t slen = sizeof(error);
1882
1883                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1884                         if (r < 0)
1885                                 bus->last_connect_error = errno;
1886                         else if (error != 0)
1887                                 bus->last_connect_error = error;
1888                         else if (p.revents & (POLLERR|POLLHUP))
1889                                 bus->last_connect_error = ECONNREFUSED;
1890                         else {
1891                                 r = bus_start_auth(bus);
1892                                 goto null_message;
1893                         }
1894
1895                         /* Try next address */
1896                         r = bus_start_connect(bus);
1897                         goto null_message;
1898                 }
1899
1900                 r = 0;
1901                 goto null_message;
1902
1903         } else if (bus->state == BUS_AUTHENTICATING) {
1904
1905                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1906                         return -ETIMEDOUT;
1907
1908                 r = bus_write_auth(bus);
1909                 if (r != 0)
1910                         goto null_message;
1911
1912                 r = bus_read_auth(bus);
1913                 goto null_message;
1914
1915         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1916                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1917                 int k;
1918
1919                 r = process_timeout(bus);
1920                 if (r != 0)
1921                         goto null_message;
1922
1923                 r = dispatch_wqueue(bus);
1924                 if (r != 0)
1925                         goto null_message;
1926
1927                 k = r;
1928                 r = dispatch_rqueue(bus, &m);
1929                 if (r < 0)
1930                         return r;
1931                 if (!m) {
1932                         if (r == 0)
1933                                 r = k;
1934                         goto null_message;
1935                 }
1936
1937                 r = process_message(bus, m);
1938                 if (r != 0)
1939                         goto null_message;
1940
1941                 if (ret) {
1942                         *ret = m;
1943                         m = NULL;
1944                         return 1;
1945                 }
1946
1947                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1948                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1949                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1950
1951                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1952
1953                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1954                         if (r < 0)
1955                                 return r;
1956
1957                         r = sd_bus_send(bus, reply, NULL);
1958                         if (r < 0)
1959                                 return r;
1960                 }
1961
1962                 return 1;
1963         }
1964
1965         assert_not_reached("Unknown state");
1966
1967 null_message:
1968         if (r >= 0 && ret)
1969                 *ret = NULL;
1970
1971         return r;
1972 }
1973
1974 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1975         struct pollfd p;
1976         int r, e;
1977         struct timespec ts;
1978         usec_t until, m;
1979
1980         assert(bus);
1981
1982         if (bus->fd < 0)
1983                 return -ENOTCONN;
1984
1985         e = sd_bus_get_events(bus);
1986         if (e < 0)
1987                 return e;
1988
1989         if (need_more)
1990                 e |= POLLIN;
1991
1992         r = sd_bus_get_timeout(bus, &until);
1993         if (r < 0)
1994                 return r;
1995         if (r == 0)
1996                 m = (uint64_t) -1;
1997         else {
1998                 usec_t n;
1999                 n = now(CLOCK_MONOTONIC);
2000                 m = until > n ? until - n : 0;
2001         }
2002
2003         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
2004                 m = timeout_usec;
2005
2006         zero(p);
2007         p.fd = bus->fd;
2008         p.events = e;
2009
2010         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
2011         if (r < 0)
2012                 return -errno;
2013
2014         return r > 0 ? 1 : 0;
2015 }
2016
2017 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
2018
2019         if (!bus)
2020                 return -EINVAL;
2021         if (bus->state == BUS_UNSET)
2022                 return -ENOTCONN;
2023         if (bus->fd < 0)
2024                 return -ENOTCONN;
2025         if (bus->rqueue_size > 0)
2026                 return 0;
2027
2028         return bus_poll(bus, false, timeout_usec);
2029 }
2030
2031 int sd_bus_flush(sd_bus *bus) {
2032         int r;
2033
2034         if (!bus)
2035                 return -EINVAL;
2036         if (bus->state == BUS_UNSET)
2037                 return -ENOTCONN;
2038         if (bus->fd < 0)
2039                 return -ENOTCONN;
2040
2041         r = bus_ensure_running(bus);
2042         if (r < 0)
2043                 return r;
2044
2045         if (bus->wqueue_size <= 0)
2046                 return 0;
2047
2048         for (;;) {
2049                 r = dispatch_wqueue(bus);
2050                 if (r < 0)
2051                         return r;
2052
2053                 if (bus->wqueue_size <= 0)
2054                         return 0;
2055
2056                 r = bus_poll(bus, false, (uint64_t) -1);
2057                 if (r < 0)
2058                         return r;
2059         }
2060 }
2061
2062 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
2063         struct filter_callback *f;
2064
2065         if (!bus)
2066                 return -EINVAL;
2067         if (!callback)
2068                 return -EINVAL;
2069
2070         f = new(struct filter_callback, 1);
2071         if (!f)
2072                 return -ENOMEM;
2073         f->callback = callback;
2074         f->userdata = userdata;
2075
2076         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
2077         return 0;
2078 }
2079
2080 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
2081         struct filter_callback *f;
2082
2083         if (!bus)
2084                 return -EINVAL;
2085         if (!callback)
2086                 return -EINVAL;
2087
2088         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2089                 if (f->callback == callback && f->userdata == userdata) {
2090                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2091                         free(f);
2092                         return 1;
2093                 }
2094         }
2095
2096         return 0;
2097 }
2098
2099 static int bus_add_object(
2100                 sd_bus *bus,
2101                 bool fallback,
2102                 const char *path,
2103                 sd_message_handler_t callback,
2104                 void *userdata) {
2105
2106         struct object_callback *c;
2107         int r;
2108
2109         if (!bus)
2110                 return -EINVAL;
2111         if (!path)
2112                 return -EINVAL;
2113         if (!callback)
2114                 return -EINVAL;
2115
2116         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2117         if (r < 0)
2118                 return r;
2119
2120         c = new(struct object_callback, 1);
2121         if (!c)
2122                 return -ENOMEM;
2123
2124         c->path = strdup(path);
2125         if (!path) {
2126                 free(c);
2127                 return -ENOMEM;
2128         }
2129
2130         c->callback = callback;
2131         c->userdata = userdata;
2132         c->is_fallback = fallback;
2133
2134         r = hashmap_put(bus->object_callbacks, c->path, c);
2135         if (r < 0) {
2136                 free(c->path);
2137                 free(c);
2138                 return r;
2139         }
2140
2141         return 0;
2142 }
2143
2144 static int bus_remove_object(
2145                 sd_bus *bus,
2146                 bool fallback,
2147                 const char *path,
2148                 sd_message_handler_t callback,
2149                 void *userdata) {
2150
2151         struct object_callback *c;
2152
2153         if (!bus)
2154                 return -EINVAL;
2155         if (!path)
2156                 return -EINVAL;
2157         if (!callback)
2158                 return -EINVAL;
2159
2160         c = hashmap_get(bus->object_callbacks, path);
2161         if (!c)
2162                 return 0;
2163
2164         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2165                 return 0;
2166
2167         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2168
2169         free(c->path);
2170         free(c);
2171
2172         return 1;
2173 }
2174
2175 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2176         return bus_add_object(bus, false, path, callback, userdata);
2177 }
2178
2179 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2180         return bus_remove_object(bus, false, path, callback, userdata);
2181 }
2182
2183 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2184         return bus_add_object(bus, true, prefix, callback, userdata);
2185 }
2186
2187 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2188         return bus_remove_object(bus, true, prefix, callback, userdata);
2189 }