chiark / gitweb /
72c790bd59a72108d67d2f1f65824db7a13b988a
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "missing.h"
33
34 #include "sd-bus.h"
35 #include "bus-internal.h"
36 #include "bus-message.h"
37 #include "bus-type.h"
38
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42         struct filter_callback *f;
43         struct object_callback *c;
44         unsigned i;
45
46         assert(b);
47
48         if (b->fd >= 0)
49                 close_nointr_nofail(b->fd);
50
51         free(b->rbuffer);
52         free(b->unique_name);
53         free(b->auth_uid);
54         free(b->address);
55
56         close_many(b->fds, b->n_fds);
57         free(b->fds);
58
59         for (i = 0; i < b->rqueue_size; i++)
60                 sd_bus_message_unref(b->rqueue[i]);
61         free(b->rqueue);
62
63         for (i = 0; i < b->wqueue_size; i++)
64                 sd_bus_message_unref(b->wqueue[i]);
65         free(b->wqueue);
66
67         hashmap_free_free(b->reply_callbacks);
68         prioq_free(b->reply_callbacks_prioq);
69
70         while ((f = b->filter_callbacks)) {
71                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
72                 free(f);
73         }
74
75         while ((c = hashmap_steal_first(b->object_callbacks))) {
76                 free(c->path);
77                 free(c);
78         }
79
80         hashmap_free(b->object_callbacks);
81
82         free(b);
83 }
84
85 static sd_bus* bus_new(void) {
86         sd_bus *r;
87
88         r = new0(sd_bus, 1);
89         if (!r)
90                 return NULL;
91
92         r->n_ref = 1;
93         r->fd = -1;
94         r->message_version = 1;
95
96         /* We guarantee that wqueue always has space for at least one
97          * entry */
98         r->wqueue = new(sd_bus_message*, 1);
99         if (!r->wqueue) {
100                 free(r);
101                 return NULL;
102         }
103
104         return r;
105 };
106
107 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
108         const char *s;
109         int r;
110
111         assert(bus);
112
113         if (error != 0)
114                 return -error;
115
116         assert(reply);
117
118         r = sd_bus_message_read(reply, "s", &s);
119         if (r < 0)
120                 return r;
121
122         if (!service_name_is_valid(s) || s[0] != ':')
123                 return -EBADMSG;
124
125         bus->unique_name = strdup(s);
126         if (!bus->unique_name)
127                 return -ENOMEM;
128
129         bus->state = BUS_RUNNING;
130
131         return 1;
132 }
133
134 static int bus_send_hello(sd_bus *bus) {
135         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
136         int r;
137
138         assert(bus);
139
140         r = sd_bus_message_new_method_call(
141                         bus,
142                         "org.freedesktop.DBus",
143                         "/",
144                         "org.freedesktop.DBus",
145                         "Hello",
146                         &m);
147         if (r < 0)
148                 return r;
149
150         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
151         if (r < 0)
152                 return r;
153
154         bus->sent_hello = true;
155         return r;
156 }
157
158 static int bus_start_running(sd_bus *bus) {
159         assert(bus);
160
161         if (bus->sent_hello) {
162                 bus->state = BUS_HELLO;
163                 return 1;
164         }
165
166         bus->state = BUS_RUNNING;
167         return 1;
168 }
169
170 static int parse_address_key(const char **p, const char *key, char **value) {
171         size_t l, n = 0;
172         const char *a;
173         char *r = NULL;
174
175         assert(p);
176         assert(*p);
177         assert(key);
178         assert(value);
179
180         l = strlen(key);
181         if (strncmp(*p, key, l) != 0)
182                 return 0;
183
184         if ((*p)[l] != '=')
185                 return 0;
186
187         if (*value)
188                 return -EINVAL;
189
190         a = *p + l + 1;
191         while (*a != ',' && *a != 0) {
192                 char c, *t;
193
194                 if (*a == '%') {
195                         int x, y;
196
197                         x = unhexchar(a[1]);
198                         if (x < 0) {
199                                 free(r);
200                                 return x;
201                         }
202
203                         y = unhexchar(a[2]);
204                         if (y < 0) {
205                                 free(r);
206                                 return y;
207                         }
208
209                         c = (char) ((x << 4) | y);
210                         a += 3;
211                 } else {
212                         c = *a;
213                         a++;
214                 }
215
216                 t = realloc(r, n + 2);
217                 if (!t) {
218                         free(r);
219                         return -ENOMEM;
220                 }
221
222                 r = t;
223                 r[n++] = c;
224         }
225
226         if (!r) {
227                 r = strdup("");
228                 if (!r)
229                         return -ENOMEM;
230         } else
231                 r[n] = 0;
232
233         if (*a == ',')
234                 a++;
235
236         *p = a;
237         *value = r;
238         return 1;
239 }
240
241 static void skip_address_key(const char **p) {
242         assert(p);
243         assert(*p);
244
245         *p += strcspn(*p, ",");
246
247         if (**p == ',')
248                 (*p) ++;
249 }
250
251 static int bus_parse_next_address(sd_bus *b) {
252         const char *a, *p;
253         _cleanup_free_ char *guid = NULL;
254         int r;
255
256         assert(b);
257
258         if (!b->address)
259                 return 0;
260         if (b->address[b->address_index] == 0)
261                 return 0;
262
263         a = b->address + b->address_index;
264
265         zero(b->sockaddr);
266         b->sockaddr_size = 0;
267         b->peer = SD_ID128_NULL;
268
269         if (startswith(a, "unix:")) {
270                 _cleanup_free_ char *path = NULL, *abstract = NULL;
271
272                 p = a + 5;
273                 while (*p != 0) {
274                         r = parse_address_key(&p, "guid", &guid);
275                         if (r < 0)
276                                 return r;
277                         else if (r > 0)
278                                 continue;
279
280                         r = parse_address_key(&p, "path", &path);
281                         if (r < 0)
282                                 return r;
283                         else if (r > 0)
284                                 continue;
285
286                         r = parse_address_key(&p, "abstract", &abstract);
287                         if (r < 0)
288                                 return r;
289                         else if (r > 0)
290                                 continue;
291
292                         skip_address_key(&p);
293                 }
294
295                 if (!path && !abstract)
296                         return -EINVAL;
297
298                 if (path && abstract)
299                         return -EINVAL;
300
301                 if (path) {
302                         size_t l;
303
304                         l = strlen(path);
305                         if (l > sizeof(b->sockaddr.un.sun_path))
306                                 return -E2BIG;
307
308                         b->sockaddr.un.sun_family = AF_UNIX;
309                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
310                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
311                 } else if (abstract) {
312                         size_t l;
313
314                         l = strlen(abstract);
315                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
316                                 return -E2BIG;
317
318                         b->sockaddr.un.sun_family = AF_UNIX;
319                         b->sockaddr.un.sun_path[0] = 0;
320                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
321                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
322                 }
323
324         } else if (startswith(a, "tcp:")) {
325                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
326                 struct addrinfo hints, *result;
327
328                 p = a + 4;
329                 while (*p != 0) {
330                         r = parse_address_key(&p, "guid", &guid);
331                         if (r < 0)
332                                 return r;
333                         else if (r > 0)
334                                 continue;
335
336                         r = parse_address_key(&p, "host", &host);
337                         if (r < 0)
338                                 return r;
339                         else if (r > 0)
340                                 continue;
341
342                         r = parse_address_key(&p, "port", &port);
343                         if (r < 0)
344                                 return r;
345                         else if (r > 0)
346                                 continue;
347
348                         r = parse_address_key(&p, "family", &family);
349                         if (r < 0)
350                                 return r;
351                         else if (r > 0)
352                                 continue;
353
354                         skip_address_key(&p);
355                 }
356
357                 if (!host || !port)
358                         return -EINVAL;
359
360                 zero(hints);
361                 hints.ai_socktype = SOCK_STREAM;
362                 hints.ai_flags = AI_ADDRCONFIG;
363
364                 if (family) {
365                         if (streq(family, "ipv4"))
366                                 hints.ai_family = AF_INET;
367                         else if (streq(family, "ipv6"))
368                                 hints.ai_family = AF_INET6;
369                         else
370                                 return -EINVAL;
371                 }
372
373                 r = getaddrinfo(host, port, &hints, &result);
374                 if (r == EAI_SYSTEM)
375                         return -errno;
376                 else if (r != 0)
377                         return -EADDRNOTAVAIL;
378
379                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
380                 b->sockaddr_size = result->ai_addrlen;
381
382                 freeaddrinfo(result);
383         }
384
385         if (guid) {
386                 r = sd_id128_from_string(guid, &b->peer);
387                 if (r < 0)
388                         return r;
389         }
390
391         b->address_index = p - b->address;
392         return 1;
393 }
394
395 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
396
397         while (size > 0) {
398                 struct iovec *i = iov + *idx;
399
400                 if (i->iov_len > size) {
401                         i->iov_base = (uint8_t*) i->iov_base + size;
402                         i->iov_len -= size;
403                         return;
404                 }
405
406                 size -= i->iov_len;
407
408                 i->iov_base = NULL;
409                 i->iov_len = 0;
410
411                 (*idx) ++;
412         }
413 }
414
415 static int bus_write_auth(sd_bus *b) {
416         struct msghdr mh;
417         ssize_t k;
418
419         assert(b);
420         assert(b->state == BUS_AUTHENTICATING);
421
422         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
423                 return 0;
424
425         if (b->auth_timeout == 0)
426                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
427
428         zero(mh);
429         mh.msg_iov = b->auth_iovec + b->auth_index;
430         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
431
432         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
433         if (k < 0)
434                 return errno == EAGAIN ? 0 : -errno;
435
436         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
437
438         return 1;
439 }
440
441 static int bus_auth_verify(sd_bus *b) {
442         char *e, *f;
443         sd_id128_t peer;
444         unsigned i;
445         int r;
446
447         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
448          * that's it */
449
450         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
451         if (!e)
452                 return 0;
453
454         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
455         if (!f)
456                 return 0;
457
458         if (e - (char*) b->rbuffer != 3 + 32)
459                 return -EPERM;
460
461         if (memcmp(b->rbuffer, "OK ", 3))
462                 return -EPERM;
463
464         for (i = 0; i < 32; i += 2) {
465                 int x, y;
466
467                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
468                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
469
470                 if (x < 0 || y < 0)
471                         return -EINVAL;
472
473                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
474         }
475
476         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
477             !sd_id128_equal(b->peer, peer))
478                 return -EPERM;
479
480         b->peer = peer;
481
482         b->can_fds =
483                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
484                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
485
486         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
487         memmove(b->rbuffer, f + 2, b->rbuffer_size);
488
489         r = bus_start_running(b);
490         if (r < 0)
491                 return r;
492
493         return 1;
494 }
495
496 static int bus_read_auth(sd_bus *b) {
497         struct msghdr mh;
498         struct iovec iov;
499         size_t n;
500         ssize_t k;
501         int r;
502         void *p;
503
504         assert(b);
505
506         r = bus_auth_verify(b);
507         if (r != 0)
508                 return r;
509
510         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
511
512         if (n > BUS_AUTH_SIZE_MAX)
513                 n = BUS_AUTH_SIZE_MAX;
514
515         if (b->rbuffer_size >= n)
516                 return -ENOBUFS;
517
518         p = realloc(b->rbuffer, n);
519         if (!p)
520                 return -ENOMEM;
521
522         b->rbuffer = p;
523
524         zero(iov);
525         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
526         iov.iov_len = n - b->rbuffer_size;
527
528         zero(mh);
529         mh.msg_iov = &iov;
530         mh.msg_iovlen = 1;
531
532         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
533         if (k < 0)
534                 return errno == EAGAIN ? 0 : -errno;
535
536         b->rbuffer_size += k;
537
538         r = bus_auth_verify(b);
539         if (r != 0)
540                 return r;
541
542         return 1;
543 }
544
545 static int bus_setup_fd(sd_bus *b) {
546         int one;
547
548         assert(b);
549
550         /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
551          * just in case. This is actually irrelavant for */
552         one = 1;
553         setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
554         setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
555
556         /* Increase the buffers to a MB */
557         fd_inc_rcvbuf(b->fd, 1024*1024);
558         fd_inc_sndbuf(b->fd, 1024*1024);
559
560         return 0;
561 }
562
563 static int bus_start_auth(sd_bus *b) {
564         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
565         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
566
567         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
568         size_t l;
569
570         assert(b);
571
572         b->state = BUS_AUTHENTICATING;
573
574         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
575         char_array_0(text);
576
577         l = strlen(text);
578         b->auth_uid = hexmem(text, l);
579         if (!b->auth_uid)
580                 return -ENOMEM;
581
582         b->auth_iovec[0].iov_base = (void*) auth_prefix;
583         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
584         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
585         b->auth_iovec[1].iov_len = l * 2;
586         b->auth_iovec[2].iov_base = (void*) auth_suffix;
587         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
588         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
589
590         return bus_write_auth(b);
591 }
592
593 static int bus_start_connect(sd_bus *b) {
594         int r;
595
596         assert(b);
597         assert(b->fd < 0);
598
599         for (;;) {
600                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
601                         r = bus_parse_next_address(b);
602                         if (r < 0)
603                                 return r;
604                         if (r == 0)
605                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
606                 }
607
608                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
609                 if (b->fd < 0) {
610                         b->last_connect_error = errno;
611                         goto try_again;
612                 }
613
614                 r = bus_setup_fd(b);
615                 if (r < 0) {
616                         b->last_connect_error = errno;
617                         goto try_again;
618                 }
619
620                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
621                 if (r < 0) {
622                         if (errno == EINPROGRESS)
623                                 return 1;
624
625                         b->last_connect_error = errno;
626                         goto try_again;
627                 }
628
629                 return bus_start_auth(b);
630
631         try_again:
632                 zero(b->sockaddr);
633
634                 if (b->fd >= 0) {
635                         close_nointr_nofail(b->fd);
636                         b->fd = -1;
637                 }
638         }
639 }
640
641 int sd_bus_open_system(sd_bus **ret) {
642         const char *e;
643         sd_bus *b;
644         int r;
645
646         if (!ret)
647                 return -EINVAL;
648
649         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
650         if (e) {
651                 r = sd_bus_open_address(e, &b);
652                 if (r < 0)
653                         return r;
654         } else {
655                 b = bus_new();
656                 if (!b)
657                         return -ENOMEM;
658
659                 b->sockaddr.un.sun_family = AF_UNIX;
660                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
661                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
662
663                 r = bus_start_connect(b);
664                 if (r < 0) {
665                         bus_free(b);
666                         return r;
667                 }
668         }
669
670         r = bus_send_hello(b);
671         if (r < 0) {
672                 sd_bus_unref(b);
673                 return r;
674         }
675
676         *ret = b;
677         return 0;
678 }
679
680 int sd_bus_open_user(sd_bus **ret) {
681         const char *e;
682         sd_bus *b;
683         size_t l;
684         int r;
685
686         if (!ret)
687                 return -EINVAL;
688
689         e = getenv("DBUS_SESSION_BUS_ADDRESS");
690         if (e) {
691                 r = sd_bus_open_address(e, &b);
692                 if (r < 0)
693                         return r;
694         } else {
695                 e = getenv("XDG_RUNTIME_DIR");
696                 if (!e)
697                         return -ENOENT;
698
699                 l = strlen(e);
700                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
701                         return -E2BIG;
702
703                 b = bus_new();
704                 if (!b)
705                         return -ENOMEM;
706
707                 b->sockaddr.un.sun_family = AF_UNIX;
708                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
709                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
710
711                 r = bus_start_connect(b);
712                 if (r < 0) {
713                         bus_free(b);
714                         return r;
715                 }
716         }
717
718         r = bus_send_hello(b);
719         if (r < 0) {
720                 sd_bus_unref(b);
721                 return r;
722         }
723
724         *ret = b;
725         return 0;
726 }
727
728 int sd_bus_open_address(const char *address, sd_bus **ret) {
729         sd_bus *b;
730         int r;
731
732         if (!address)
733                 return -EINVAL;
734         if (!ret)
735                 return -EINVAL;
736
737         b = bus_new();
738         if (!b)
739                 return -ENOMEM;
740
741         b->address = strdup(address);
742         if (!b->address) {
743                 bus_free(b);
744                 return -ENOMEM;
745         }
746
747         r = bus_start_connect(b);
748         if (r < 0) {
749                 bus_free(b);
750                 return r;
751         }
752
753         *ret = b;
754         return 0;
755 }
756
757 int sd_bus_open_fd(int fd, sd_bus **ret) {
758         sd_bus *b;
759         int r;
760
761         if (fd < 0)
762                 return -EINVAL;
763         if (!ret)
764                 return -EINVAL;
765
766         b = bus_new();
767         if (!b)
768                 return -ENOMEM;
769
770         b->fd = fd;
771
772         r = fd_nonblock(b->fd, true);
773         if (r < 0)
774                 goto fail;
775
776         fd_cloexec(b->fd, true);
777         if (r < 0)
778                 goto fail;
779
780         r = bus_setup_fd(b);
781         if (r < 0)
782                 goto fail;
783
784         r = bus_start_auth(b);
785         if (r < 0)
786                 goto fail;
787
788         *ret = b;
789         return 0;
790
791 fail:
792                 bus_free(b);
793         return r;
794 }
795
796 void sd_bus_close(sd_bus *bus) {
797         if (!bus)
798                 return;
799         if (bus->fd < 0)
800                 return;
801
802         close_nointr_nofail(bus->fd);
803         bus->fd = -1;
804 }
805
806 sd_bus *sd_bus_ref(sd_bus *bus) {
807         if (!bus)
808                 return NULL;
809
810         assert(bus->n_ref > 0);
811
812         bus->n_ref++;
813         return bus;
814 }
815
816 sd_bus *sd_bus_unref(sd_bus *bus) {
817         if (!bus)
818                 return NULL;
819
820         assert(bus->n_ref > 0);
821         bus->n_ref--;
822
823         if (bus->n_ref <= 0)
824                 bus_free(bus);
825
826         return NULL;
827 }
828
829 int sd_bus_is_open(sd_bus *bus) {
830         if (!bus)
831                 return -EINVAL;
832
833         return bus->fd >= 0;
834 }
835
836 int sd_bus_can_send(sd_bus *bus, char type) {
837         int r;
838
839         if (!bus)
840                 return -EINVAL;
841
842         if (type == SD_BUS_TYPE_UNIX_FD) {
843                 r = bus_ensure_running(bus);
844                 if (r < 0)
845                         return r;
846
847                 return bus->can_fds;
848         }
849
850         return bus_type_is_valid(type);
851 }
852
853 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
854         int r;
855
856         if (!bus)
857                 return -EINVAL;
858         if (!peer)
859                 return -EINVAL;
860
861         r = bus_ensure_running(bus);
862         if (r < 0)
863                 return r;
864
865         *peer = bus->peer;
866         return 0;
867 }
868
869 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
870         assert(m);
871
872         if (m->header->version > b->message_version)
873                 return -EPERM;
874
875         if (m->sealed)
876                 return 0;
877
878         return bus_message_seal(m, ++b->serial);
879 }
880
881 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
882         struct msghdr mh;
883         struct iovec *iov;
884         ssize_t k;
885         size_t n;
886         unsigned j;
887
888         assert(bus);
889         assert(m);
890         assert(idx);
891         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
892
893         if (*idx >= m->size)
894                 return 0;
895         zero(mh);
896
897         if (m->n_fds > 0) {
898                 struct cmsghdr *control;
899                 control = alloca(CMSG_SPACE(sizeof(int) * m->n_fds));
900
901                 mh.msg_control = control;
902                 control->cmsg_level = SOL_SOCKET;
903                 control->cmsg_type = SCM_RIGHTS;
904                 mh.msg_controllen = control->cmsg_len = CMSG_LEN(sizeof(int) * m->n_fds);
905                 memcpy(CMSG_DATA(control), m->fds, sizeof(int) * m->n_fds);
906         }
907
908         n = m->n_iovec * sizeof(struct iovec);
909         iov = alloca(n);
910         memcpy(iov, m->iovec, n);
911
912         j = 0;
913         iovec_advance(iov, &j, *idx);
914
915         mh.msg_iov = iov;
916         mh.msg_iovlen = m->n_iovec;
917
918         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
919         if (k < 0)
920                 return errno == EAGAIN ? 0 : -errno;
921
922         *idx += (size_t) k;
923         return 1;
924 }
925
926 static int message_read_need(sd_bus *bus, size_t *need) {
927         uint32_t a, b;
928         uint8_t e;
929         uint64_t sum;
930
931         assert(bus);
932         assert(need);
933         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
934
935         if (bus->rbuffer_size < sizeof(struct bus_header)) {
936                 *need = sizeof(struct bus_header) + 8;
937
938                 /* Minimum message size:
939                  *
940                  * Header +
941                  *
942                  *  Method Call: +2 string headers
943                  *       Signal: +3 string headers
944                  * Method Error: +1 string headers
945                  *               +1 uint32 headers
946                  * Method Reply: +1 uint32 headers
947                  *
948                  * A string header is at least 9 bytes
949                  * A uint32 header is at least 8 bytes
950                  *
951                  * Hence the minimum message size of a valid message
952                  * is header + 8 bytes */
953
954                 return 0;
955         }
956
957         a = ((const uint32_t*) bus->rbuffer)[1];
958         b = ((const uint32_t*) bus->rbuffer)[3];
959
960         e = ((const uint8_t*) bus->rbuffer)[0];
961         if (e == SD_BUS_LITTLE_ENDIAN) {
962                 a = le32toh(a);
963                 b = le32toh(b);
964         } else if (e == SD_BUS_BIG_ENDIAN) {
965                 a = be32toh(a);
966                 b = be32toh(b);
967         } else
968                 return -EBADMSG;
969
970         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
971         if (sum >= BUS_MESSAGE_SIZE_MAX)
972                 return -ENOBUFS;
973
974         *need = (size_t) sum;
975         return 0;
976 }
977
978 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
979         sd_bus_message *t;
980         void *b;
981         int r;
982
983         assert(bus);
984         assert(m);
985         assert(bus->rbuffer_size >= size);
986         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
987
988         if (bus->rbuffer_size > size) {
989                 b = memdup((const uint8_t*) bus->rbuffer + size,
990                            bus->rbuffer_size - size);
991                 if (!b)
992                         return -ENOMEM;
993         } else
994                 b = NULL;
995
996         r = bus_message_from_malloc(bus->rbuffer, size,
997                                     bus->fds, bus->n_fds,
998                                     bus->ucred_valid ? &bus->ucred : NULL,
999                                     bus->label[0] ? bus->label : NULL,
1000                                     &t);
1001         if (r < 0) {
1002                 free(b);
1003                 return r;
1004         }
1005
1006         bus->rbuffer = b;
1007         bus->rbuffer_size -= size;
1008
1009         bus->fds = NULL;
1010         bus->n_fds = 0;
1011
1012         *m = t;
1013         return 1;
1014 }
1015
1016 static int message_read(sd_bus *bus, sd_bus_message **m) {
1017         struct msghdr mh;
1018         struct iovec iov;
1019         ssize_t k;
1020         size_t need;
1021         int r;
1022         void *b;
1023         union {
1024                 struct cmsghdr cmsghdr;
1025                 uint8_t buf[CMSG_SPACE(sizeof(int) * BUS_FDS_MAX) +
1026                             CMSG_SPACE(sizeof(struct ucred)) +
1027                             CMSG_SPACE(NAME_MAX)]; /*selinux label */
1028         } control;
1029         struct cmsghdr *cmsg;
1030
1031         assert(bus);
1032         assert(m);
1033         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1034
1035         r = message_read_need(bus, &need);
1036         if (r < 0)
1037                 return r;
1038
1039         if (bus->rbuffer_size >= need)
1040                 return message_make(bus, need, m);
1041
1042         b = realloc(bus->rbuffer, need);
1043         if (!b)
1044                 return -ENOMEM;
1045
1046         bus->rbuffer = b;
1047
1048         zero(iov);
1049         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1050         iov.iov_len = need - bus->rbuffer_size;
1051
1052         zero(mh);
1053         mh.msg_iov = &iov;
1054         mh.msg_iovlen = 1;
1055         mh.msg_control = &control;
1056         mh.msg_controllen = sizeof(control);
1057
1058         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1059         if (k < 0)
1060                 return errno == EAGAIN ? 0 : -errno;
1061
1062         bus->rbuffer_size += k;
1063
1064         for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1065                 if (cmsg->cmsg_level == SOL_SOCKET &&
1066                     cmsg->cmsg_type == SCM_RIGHTS) {
1067                         int n, *f;
1068
1069                         n = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
1070
1071                         f = realloc(bus->fds, sizeof(int) + (bus->n_fds + n));
1072                         if (!f) {
1073                                 close_many((int*) CMSG_DATA(cmsg), n);
1074                                 return -ENOMEM;
1075                         }
1076
1077                         memcpy(f + bus->n_fds, CMSG_DATA(cmsg), n * sizeof(int));
1078                         bus->fds = f;
1079                         bus->n_fds += n;
1080                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1081                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1082                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1083
1084                         memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1085                         bus->ucred_valid = true;
1086
1087                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1088                          cmsg->cmsg_type == SCM_SECURITY) {
1089
1090                         size_t l;
1091                         l = cmsg->cmsg_len - CMSG_LEN(0);
1092                         memcpy(&bus->label, CMSG_DATA(cmsg), l);
1093                         bus->label[l] = 0;
1094                 }
1095         }
1096
1097         r = message_read_need(bus, &need);
1098         if (r < 0)
1099                 return r;
1100
1101         if (bus->rbuffer_size >= need)
1102                 return message_make(bus, need, m);
1103
1104         return 1;
1105 }
1106
1107 static int dispatch_wqueue(sd_bus *bus) {
1108         int r, ret = 0;
1109
1110         assert(bus);
1111         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1112
1113         if (bus->fd < 0)
1114                 return -ENOTCONN;
1115
1116         while (bus->wqueue_size > 0) {
1117
1118                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1119                 if (r < 0) {
1120                         sd_bus_close(bus);
1121                         return r;
1122                 } else if (r == 0)
1123                         /* Didn't do anything this time */
1124                         return ret;
1125                 else if (bus->windex >= bus->wqueue[0]->size) {
1126                         /* Fully written. Let's drop the entry from
1127                          * the queue.
1128                          *
1129                          * This isn't particularly optimized, but
1130                          * well, this is supposed to be our worst-case
1131                          * buffer only, and the socket buffer is
1132                          * supposed to be our primary buffer, and if
1133                          * it got full, then all bets are off
1134                          * anyway. */
1135
1136                         sd_bus_message_unref(bus->wqueue[0]);
1137                         bus->wqueue_size --;
1138                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1139                         bus->windex = 0;
1140
1141                         ret = 1;
1142                 }
1143         }
1144
1145         return ret;
1146 }
1147
1148 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1149         sd_bus_message *z = NULL;
1150         int r, ret = 0;
1151
1152         assert(bus);
1153         assert(m);
1154         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1155
1156         if (bus->fd < 0)
1157                 return -ENOTCONN;
1158
1159         if (bus->rqueue_size > 0) {
1160                 /* Dispatch a queued message */
1161
1162                 *m = bus->rqueue[0];
1163                 bus->rqueue_size --;
1164                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1165                 return 1;
1166         }
1167
1168         /* Try to read a new message */
1169         do {
1170                 r = message_read(bus, &z);
1171                 if (r < 0) {
1172                         sd_bus_close(bus);
1173                         return r;
1174                 }
1175                 if (r == 0)
1176                         return ret;
1177
1178                 r = 1;
1179         } while (!z);
1180
1181         *m = z;
1182         return 1;
1183 }
1184
1185 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1186         int r;
1187
1188         if (!bus)
1189                 return -EINVAL;
1190         if (bus->fd < 0)
1191                 return -ENOTCONN;
1192         if (!m)
1193                 return -EINVAL;
1194         if (m->n_fds > 0 && !bus->can_fds)
1195                 return -ENOTSUP;
1196
1197         /* If the serial number isn't kept, then we know that no reply
1198          * is expected */
1199         if (!serial && !m->sealed)
1200                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1201
1202         r = bus_seal_message(bus, m);
1203         if (r < 0)
1204                 return r;
1205
1206         /* If this is a reply and no reply was requested, then let's
1207          * suppress this, if we can */
1208         if (m->dont_send && !serial)
1209                 return 0;
1210
1211         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1212                 size_t idx = 0;
1213
1214                 r = message_write(bus, m, &idx);
1215                 if (r < 0) {
1216                         sd_bus_close(bus);
1217                         return r;
1218                 } else if (idx < m->size)  {
1219                         /* Wasn't fully written. So let's remember how
1220                          * much was written. Note that the first entry
1221                          * of the wqueue array is always allocated so
1222                          * that we always can remember how much was
1223                          * written. */
1224                         bus->wqueue[0] = sd_bus_message_ref(m);
1225                         bus->wqueue_size = 1;
1226                         bus->windex = idx;
1227                 }
1228         } else {
1229                 sd_bus_message **q;
1230
1231                 /* Just append it to the queue. */
1232
1233                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1234                         return -ENOBUFS;
1235
1236                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1237                 if (!q)
1238                         return -ENOMEM;
1239
1240                 bus->wqueue = q;
1241                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1242         }
1243
1244         if (serial)
1245                 *serial = BUS_MESSAGE_SERIAL(m);
1246
1247         return 0;
1248 }
1249
1250 static usec_t calc_elapse(uint64_t usec) {
1251         if (usec == (uint64_t) -1)
1252                 return 0;
1253
1254         if (usec == 0)
1255                 usec = BUS_DEFAULT_TIMEOUT;
1256
1257         return now(CLOCK_MONOTONIC) + usec;
1258 }
1259
1260 static int timeout_compare(const void *a, const void *b) {
1261         const struct reply_callback *x = a, *y = b;
1262
1263         if (x->timeout != 0 && y->timeout == 0)
1264                 return -1;
1265
1266         if (x->timeout == 0 && y->timeout != 0)
1267                 return 1;
1268
1269         if (x->timeout < y->timeout)
1270                 return -1;
1271
1272         if (x->timeout > y->timeout)
1273                 return 1;
1274
1275         return 0;
1276 }
1277
1278 int sd_bus_send_with_reply(
1279                 sd_bus *bus,
1280                 sd_bus_message *m,
1281                 sd_message_handler_t callback,
1282                 void *userdata,
1283                 uint64_t usec,
1284                 uint64_t *serial) {
1285
1286         struct reply_callback *c;
1287         int r;
1288
1289         if (!bus)
1290                 return -EINVAL;
1291         if (bus->fd < 0)
1292                 return -ENOTCONN;
1293         if (!m)
1294                 return -EINVAL;
1295         if (!callback)
1296                 return -EINVAL;
1297         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1298                 return -EINVAL;
1299         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1300                 return -EINVAL;
1301
1302         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1303         if (r < 0)
1304                 return r;
1305
1306         if (usec != (uint64_t) -1) {
1307                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1308                 if (r < 0)
1309                         return r;
1310         }
1311
1312         r = bus_seal_message(bus, m);
1313         if (r < 0)
1314                 return r;
1315
1316         c = new(struct reply_callback, 1);
1317         if (!c)
1318                 return -ENOMEM;
1319
1320         c->callback = callback;
1321         c->userdata = userdata;
1322         c->serial = BUS_MESSAGE_SERIAL(m);
1323         c->timeout = calc_elapse(usec);
1324
1325         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1326         if (r < 0) {
1327                 free(c);
1328                 return r;
1329         }
1330
1331         if (c->timeout != 0) {
1332                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1333                 if (r < 0) {
1334                         c->timeout = 0;
1335                         sd_bus_send_with_reply_cancel(bus, c->serial);
1336                         return r;
1337                 }
1338         }
1339
1340         r = sd_bus_send(bus, m, serial);
1341         if (r < 0) {
1342                 sd_bus_send_with_reply_cancel(bus, c->serial);
1343                 return r;
1344         }
1345
1346         return r;
1347 }
1348
1349 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1350         struct reply_callback *c;
1351
1352         if (!bus)
1353                 return -EINVAL;
1354         if (serial == 0)
1355                 return -EINVAL;
1356
1357         c = hashmap_remove(bus->reply_callbacks, &serial);
1358         if (!c)
1359                 return 0;
1360
1361         if (c->timeout != 0)
1362                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1363
1364         free(c);
1365         return 1;
1366 }
1367
1368 int bus_ensure_running(sd_bus *bus) {
1369         int r;
1370
1371         assert(bus);
1372
1373         if (bus->state == BUS_RUNNING)
1374                 return 1;
1375
1376         for (;;) {
1377                 r = sd_bus_process(bus, NULL);
1378                 if (r < 0)
1379                         return r;
1380                 if (bus->state == BUS_RUNNING)
1381                         return 1;
1382                 if (r > 0)
1383                         continue;
1384
1385                 r = sd_bus_wait(bus, (uint64_t) -1);
1386                 if (r < 0)
1387                         return r;
1388         }
1389 }
1390
1391 int sd_bus_send_with_reply_and_block(
1392                 sd_bus *bus,
1393                 sd_bus_message *m,
1394                 uint64_t usec,
1395                 sd_bus_error *error,
1396                 sd_bus_message **reply) {
1397
1398         int r;
1399         usec_t timeout;
1400         uint64_t serial;
1401         bool room = false;
1402
1403         if (!bus)
1404                 return -EINVAL;
1405         if (bus->fd < 0)
1406                 return -ENOTCONN;
1407         if (!m)
1408                 return -EINVAL;
1409         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1410                 return -EINVAL;
1411         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1412                 return -EINVAL;
1413         if (bus_error_is_dirty(error))
1414                 return -EINVAL;
1415
1416         r = bus_ensure_running(bus);
1417         if (r < 0)
1418                 return r;
1419
1420         r = sd_bus_send(bus, m, &serial);
1421         if (r < 0)
1422                 return r;
1423
1424         timeout = calc_elapse(usec);
1425
1426         for (;;) {
1427                 usec_t left;
1428                 sd_bus_message *incoming = NULL;
1429
1430                 if (!room) {
1431                         sd_bus_message **q;
1432
1433                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1434                                 return -ENOBUFS;
1435
1436                         /* Make sure there's room for queuing this
1437                          * locally, before we read the message */
1438
1439                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1440                         if (!q)
1441                                 return -ENOMEM;
1442
1443                         bus->rqueue = q;
1444                         room = true;
1445                 }
1446
1447                 r = message_read(bus, &incoming);
1448                 if (r < 0)
1449                         return r;
1450                 if (incoming) {
1451
1452                         if (incoming->reply_serial == serial) {
1453                                 /* Found a match! */
1454
1455                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1456                                         *reply = incoming;
1457                                         return 0;
1458                                 }
1459
1460                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1461                                         int k;
1462
1463                                         r = sd_bus_error_copy(error, &incoming->error);
1464                                         if (r < 0) {
1465                                                 sd_bus_message_unref(incoming);
1466                                                 return r;
1467                                         }
1468
1469                                         k = bus_error_to_errno(&incoming->error);
1470                                         sd_bus_message_unref(incoming);
1471                                         return k;
1472                                 }
1473
1474                                 sd_bus_message_unref(incoming);
1475                                 return -EIO;
1476                         }
1477
1478                         /* There's already guaranteed to be room for
1479                          * this, so need to resize things here */
1480                         bus->rqueue[bus->rqueue_size ++] = incoming;
1481                         room = false;
1482
1483                         /* Try to read more, right-away */
1484                         continue;
1485                 }
1486                 if (r != 0)
1487                         continue;
1488
1489                 if (timeout > 0) {
1490                         usec_t n;
1491
1492                         n = now(CLOCK_MONOTONIC);
1493                         if (n >= timeout)
1494                                 return -ETIMEDOUT;
1495
1496                         left = timeout - n;
1497                 } else
1498                         left = (uint64_t) -1;
1499
1500                 r = bus_poll(bus, true, left);
1501                 if (r < 0)
1502                         return r;
1503
1504                 r = dispatch_wqueue(bus);
1505                 if (r < 0)
1506                         return r;
1507         }
1508 }
1509
1510 int sd_bus_get_fd(sd_bus *bus) {
1511         if (!bus)
1512                 return -EINVAL;
1513
1514         if (bus->fd < 0)
1515                 return -ENOTCONN;
1516
1517         return bus->fd;
1518 }
1519
1520 int sd_bus_get_events(sd_bus *bus) {
1521         int flags = 0;
1522
1523         if (!bus)
1524                 return -EINVAL;
1525         if (bus->fd < 0)
1526                 return -ENOTCONN;
1527
1528         if (bus->state == BUS_OPENING)
1529                 flags |= POLLOUT;
1530         else if (bus->state == BUS_AUTHENTICATING) {
1531
1532                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1533                         flags |= POLLOUT;
1534
1535                 flags |= POLLIN;
1536
1537         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1538                 if (bus->rqueue_size <= 0)
1539                         flags |= POLLIN;
1540                 if (bus->wqueue_size > 0)
1541                         flags |= POLLOUT;
1542         }
1543
1544         return flags;
1545 }
1546
1547 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1548         struct reply_callback *c;
1549
1550         if (!bus)
1551                 return -EINVAL;
1552         if (!timeout_usec)
1553                 return -EINVAL;
1554         if (bus->fd < 0)
1555                 return -ENOTCONN;
1556
1557         if (bus->state == BUS_AUTHENTICATING) {
1558                 *timeout_usec = bus->auth_timeout;
1559                 return 1;
1560         }
1561
1562         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1563                 return 0;
1564
1565         c = prioq_peek(bus->reply_callbacks_prioq);
1566         if (!c)
1567                 return 0;
1568
1569         *timeout_usec = c->timeout;
1570         return 1;
1571 }
1572
1573 static int process_timeout(sd_bus *bus) {
1574         struct reply_callback *c;
1575         usec_t n;
1576         int r;
1577
1578         assert(bus);
1579
1580         c = prioq_peek(bus->reply_callbacks_prioq);
1581         if (!c)
1582                 return 0;
1583
1584         n = now(CLOCK_MONOTONIC);
1585         if (c->timeout > n)
1586                 return 0;
1587
1588         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1589         hashmap_remove(bus->reply_callbacks, &c->serial);
1590
1591         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1592         free(c);
1593
1594         return r < 0 ? r : 1;
1595 }
1596
1597 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1598         struct reply_callback *c;
1599         int r;
1600
1601         assert(bus);
1602         assert(m);
1603
1604         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1605             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1606                 return 0;
1607
1608         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1609         if (!c)
1610                 return 0;
1611
1612         if (c->timeout != 0)
1613                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1614
1615         r = c->callback(bus, 0, m, c->userdata);
1616         free(c);
1617
1618         return r;
1619 }
1620
1621 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1622         struct filter_callback *l;
1623         int r;
1624
1625         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1626                 r = l->callback(bus, 0, m, l->userdata);
1627                 if (r != 0)
1628                         return r;
1629         }
1630
1631         return 0;
1632 }
1633
1634 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1635         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1636         int r;
1637
1638         assert(bus);
1639         assert(m);
1640
1641         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1642                 return 0;
1643
1644         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1645                 return 0;
1646
1647         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1648                 return 1;
1649
1650         if (streq_ptr(m->member, "Ping"))
1651                 r = sd_bus_message_new_method_return(bus, m, &reply);
1652         else if (streq_ptr(m->member, "GetMachineId")) {
1653                 sd_id128_t id;
1654                 char sid[33];
1655
1656                 r = sd_id128_get_machine(&id);
1657                 if (r < 0)
1658                         return r;
1659
1660                 r = sd_bus_message_new_method_return(bus, m, &reply);
1661                 if (r < 0)
1662                         return r;
1663
1664                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1665         } else {
1666                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1667
1668                 sd_bus_error_set(&error,
1669                                  "org.freedesktop.DBus.Error.UnknownMethod",
1670                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1671
1672                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1673         }
1674
1675         if (r < 0)
1676                 return r;
1677
1678         r = sd_bus_send(bus, reply, NULL);
1679         if (r < 0)
1680                 return r;
1681
1682         return 1;
1683 }
1684
1685 static int process_object(sd_bus *bus, sd_bus_message *m) {
1686         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1687         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1688         struct object_callback *c;
1689         char *p;
1690         int r;
1691         bool found = false;
1692
1693         assert(bus);
1694         assert(m);
1695
1696         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1697                 return 0;
1698
1699         if (hashmap_isempty(bus->object_callbacks))
1700                 return 0;
1701
1702         c = hashmap_get(bus->object_callbacks, m->path);
1703         if (c) {
1704                 r = c->callback(bus, 0, m, c->userdata);
1705                 if (r != 0)
1706                         return r;
1707
1708                 found = true;
1709         }
1710
1711         /* Look for fallback prefixes */
1712         p = strdupa(m->path);
1713         for (;;) {
1714                 char *e;
1715
1716                 e = strrchr(p, '/');
1717                 if (e == p || !e)
1718                         break;
1719
1720                 *e = 0;
1721
1722                 c = hashmap_get(bus->object_callbacks, p);
1723                 if (c && c->is_fallback) {
1724                         r = c->callback(bus, 0, m, c->userdata);
1725                         if (r != 0)
1726                                 return r;
1727
1728                         found = true;
1729                 }
1730         }
1731
1732         if (!found)
1733                 return 0;
1734
1735         sd_bus_error_set(&error,
1736                          "org.freedesktop.DBus.Error.UnknownMethod",
1737                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1738
1739         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1740         if (r < 0)
1741                 return r;
1742
1743         r = sd_bus_send(bus, reply, NULL);
1744         if (r < 0)
1745                 return r;
1746
1747         return 1;
1748 }
1749
1750 static int process_message(sd_bus *bus, sd_bus_message *m) {
1751         int r;
1752
1753         assert(bus);
1754         assert(m);
1755
1756         r = process_reply(bus, m);
1757         if (r != 0)
1758                 return r;
1759
1760         r = process_filter(bus, m);
1761         if (r != 0)
1762                 return r;
1763
1764         r = process_builtin(bus, m);
1765         if (r != 0)
1766                 return r;
1767
1768         return process_object(bus, m);
1769 }
1770
1771 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1772         int r;
1773
1774         /* Returns 0 when we didn't do anything. This should cause the
1775          * caller to invoke sd_bus_wait() before returning the next
1776          * time. Returns > 0 when we did something, which possibly
1777          * means *ret is filled in with an unprocessed message. */
1778
1779         if (!bus)
1780                 return -EINVAL;
1781         if (bus->fd < 0)
1782                 return -ENOTCONN;
1783
1784         if (bus->state == BUS_OPENING) {
1785                 struct pollfd p;
1786
1787                 zero(p);
1788                 p.fd = bus->fd;
1789                 p.events = POLLOUT;
1790
1791                 r = poll(&p, 1, 0);
1792                 if (r < 0)
1793                         return -errno;
1794
1795                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1796                         int error = 0;
1797                         socklen_t slen = sizeof(error);
1798
1799                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1800                         if (r < 0)
1801                                 bus->last_connect_error = errno;
1802                         else if (error != 0)
1803                                 bus->last_connect_error = error;
1804                         else if (p.revents & (POLLERR|POLLHUP))
1805                                 bus->last_connect_error = ECONNREFUSED;
1806                         else {
1807                                 r = bus_start_auth(bus);
1808                                 goto null_message;
1809                         }
1810
1811                         /* Try next address */
1812                         r = bus_start_connect(bus);
1813                         goto null_message;
1814                 }
1815
1816                 r = 0;
1817                 goto null_message;
1818
1819         } else if (bus->state == BUS_AUTHENTICATING) {
1820
1821                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1822                         return -ETIMEDOUT;
1823
1824                 r = bus_write_auth(bus);
1825                 if (r != 0)
1826                         goto null_message;
1827
1828                 r = bus_read_auth(bus);
1829                 goto null_message;
1830
1831         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1832                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1833                 int k;
1834
1835                 r = process_timeout(bus);
1836                 if (r != 0)
1837                         goto null_message;
1838
1839                 r = dispatch_wqueue(bus);
1840                 if (r != 0)
1841                         goto null_message;
1842
1843                 k = r;
1844                 r = dispatch_rqueue(bus, &m);
1845                 if (r < 0)
1846                         return r;
1847                 if (!m) {
1848                         if (r == 0)
1849                                 r = k;
1850                         goto null_message;
1851                 }
1852
1853                 r = process_message(bus, m);
1854                 if (r != 0)
1855                         goto null_message;
1856
1857                 if (ret) {
1858                         *ret = m;
1859                         m = NULL;
1860                         return 1;
1861                 }
1862
1863                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1864                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1865                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1866
1867                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1868
1869                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1870                         if (r < 0)
1871                                 return r;
1872
1873                         r = sd_bus_send(bus, reply, NULL);
1874                         if (r < 0)
1875                                 return r;
1876                 }
1877
1878                 return 1;
1879         }
1880
1881         assert_not_reached("Unknown state");
1882
1883 null_message:
1884         if (r >= 0 && ret)
1885                 *ret = NULL;
1886
1887         return r;
1888 }
1889
1890 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1891         struct pollfd p;
1892         int r, e;
1893         struct timespec ts;
1894         usec_t until, m;
1895
1896         assert(bus);
1897
1898         if (bus->fd < 0)
1899                 return -ENOTCONN;
1900
1901         e = sd_bus_get_events(bus);
1902         if (e < 0)
1903                 return e;
1904
1905         if (need_more)
1906                 e |= POLLIN;
1907
1908         r = sd_bus_get_timeout(bus, &until);
1909         if (r < 0)
1910                 return r;
1911         if (r == 0)
1912                 m = (uint64_t) -1;
1913         else {
1914                 usec_t n;
1915                 n = now(CLOCK_MONOTONIC);
1916                 m = until > n ? until - n : 0;
1917         }
1918
1919         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1920                 m = timeout_usec;
1921
1922         zero(p);
1923         p.fd = bus->fd;
1924         p.events = e;
1925
1926         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1927         if (r < 0)
1928                 return -errno;
1929
1930         return r > 0 ? 1 : 0;
1931 }
1932
1933 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1934
1935         if (!bus)
1936                 return -EINVAL;
1937         if (bus->fd < 0)
1938                 return -ENOTCONN;
1939         if (bus->rqueue_size > 0)
1940                 return 0;
1941
1942         return bus_poll(bus, false, timeout_usec);
1943 }
1944
1945 int sd_bus_flush(sd_bus *bus) {
1946         int r;
1947
1948         if (!bus)
1949                 return -EINVAL;
1950         if (bus->fd < 0)
1951                 return -ENOTCONN;
1952
1953         r = bus_ensure_running(bus);
1954         if (r < 0)
1955                 return r;
1956
1957         if (bus->wqueue_size <= 0)
1958                 return 0;
1959
1960         for (;;) {
1961                 r = dispatch_wqueue(bus);
1962                 if (r < 0)
1963                         return r;
1964
1965                 if (bus->wqueue_size <= 0)
1966                         return 0;
1967
1968                 r = bus_poll(bus, false, (uint64_t) -1);
1969                 if (r < 0)
1970                         return r;
1971         }
1972 }
1973
1974 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1975         struct filter_callback *f;
1976
1977         if (!bus)
1978                 return -EINVAL;
1979         if (!callback)
1980                 return -EINVAL;
1981
1982         f = new(struct filter_callback, 1);
1983         if (!f)
1984                 return -ENOMEM;
1985         f->callback = callback;
1986         f->userdata = userdata;
1987
1988         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1989         return 0;
1990 }
1991
1992 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1993         struct filter_callback *f;
1994
1995         if (!bus)
1996                 return -EINVAL;
1997         if (!callback)
1998                 return -EINVAL;
1999
2000         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2001                 if (f->callback == callback && f->userdata == userdata) {
2002                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2003                         free(f);
2004                         return 1;
2005                 }
2006         }
2007
2008         return 0;
2009 }
2010
2011 static int bus_add_object(
2012                 sd_bus *bus,
2013                 bool fallback,
2014                 const char *path,
2015                 sd_message_handler_t callback,
2016                 void *userdata) {
2017
2018         struct object_callback *c;
2019         int r;
2020
2021         if (!bus)
2022                 return -EINVAL;
2023         if (!path)
2024                 return -EINVAL;
2025         if (!callback)
2026                 return -EINVAL;
2027
2028         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2029         if (r < 0)
2030                 return r;
2031
2032         c = new(struct object_callback, 1);
2033         if (!c)
2034                 return -ENOMEM;
2035
2036         c->path = strdup(path);
2037         if (!path) {
2038                 free(c);
2039                 return -ENOMEM;
2040         }
2041
2042         c->callback = callback;
2043         c->userdata = userdata;
2044         c->is_fallback = fallback;
2045
2046         r = hashmap_put(bus->object_callbacks, c->path, c);
2047         if (r < 0) {
2048                 free(c->path);
2049                 free(c);
2050                 return r;
2051         }
2052
2053         return 0;
2054 }
2055
2056 static int bus_remove_object(
2057                 sd_bus *bus,
2058                 bool fallback,
2059                 const char *path,
2060                 sd_message_handler_t callback,
2061                 void *userdata) {
2062
2063         struct object_callback *c;
2064
2065         if (!bus)
2066                 return -EINVAL;
2067         if (!path)
2068                 return -EINVAL;
2069         if (!callback)
2070                 return -EINVAL;
2071
2072         c = hashmap_get(bus->object_callbacks, path);
2073         if (!c)
2074                 return 0;
2075
2076         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2077                 return 0;
2078
2079         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2080
2081         free(c->path);
2082         free(c);
2083
2084         return 1;
2085 }
2086
2087 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2088         return bus_add_object(bus, false, path, callback, userdata);
2089 }
2090
2091 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2092         return bus_remove_object(bus, false, path, callback, userdata);
2093 }
2094
2095 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2096         return bus_add_object(bus, true, prefix, callback, userdata);
2097 }
2098
2099 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2100         return bus_remove_object(bus, true, prefix, callback, userdata);
2101 }