chiark / gitweb /
bus: properly handle termination of connections
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "missing.h"
33
34 #include "sd-bus.h"
35 #include "bus-internal.h"
36 #include "bus-message.h"
37 #include "bus-type.h"
38
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42         struct filter_callback *f;
43         struct object_callback *c;
44         unsigned i;
45
46         assert(b);
47
48         if (b->fd >= 0)
49                 close_nointr_nofail(b->fd);
50
51         free(b->rbuffer);
52         free(b->unique_name);
53         free(b->auth_uid);
54         free(b->address);
55
56         close_many(b->fds, b->n_fds);
57         free(b->fds);
58
59         for (i = 0; i < b->rqueue_size; i++)
60                 sd_bus_message_unref(b->rqueue[i]);
61         free(b->rqueue);
62
63         for (i = 0; i < b->wqueue_size; i++)
64                 sd_bus_message_unref(b->wqueue[i]);
65         free(b->wqueue);
66
67         hashmap_free_free(b->reply_callbacks);
68         prioq_free(b->reply_callbacks_prioq);
69
70         while ((f = b->filter_callbacks)) {
71                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
72                 free(f);
73         }
74
75         while ((c = hashmap_steal_first(b->object_callbacks))) {
76                 free(c->path);
77                 free(c);
78         }
79
80         hashmap_free(b->object_callbacks);
81
82         free(b);
83 }
84
85 static sd_bus* bus_new(void) {
86         sd_bus *r;
87
88         r = new0(sd_bus, 1);
89         if (!r)
90                 return NULL;
91
92         r->n_ref = 1;
93         r->fd = -1;
94         r->message_version = 1;
95
96         /* We guarantee that wqueue always has space for at least one
97          * entry */
98         r->wqueue = new(sd_bus_message*, 1);
99         if (!r->wqueue) {
100                 free(r);
101                 return NULL;
102         }
103
104         return r;
105 };
106
107 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
108         const char *s;
109         int r;
110
111         assert(bus);
112
113         if (error != 0)
114                 return -error;
115
116         assert(reply);
117
118         r = sd_bus_message_read(reply, "s", &s);
119         if (r < 0)
120                 return r;
121
122         if (!service_name_is_valid(s) || s[0] != ':')
123                 return -EBADMSG;
124
125         bus->unique_name = strdup(s);
126         if (!bus->unique_name)
127                 return -ENOMEM;
128
129         bus->state = BUS_RUNNING;
130
131         return 1;
132 }
133
134 static int bus_send_hello(sd_bus *bus) {
135         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
136         int r;
137
138         assert(bus);
139
140         r = sd_bus_message_new_method_call(
141                         bus,
142                         "org.freedesktop.DBus",
143                         "/",
144                         "org.freedesktop.DBus",
145                         "Hello",
146                         &m);
147         if (r < 0)
148                 return r;
149
150         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
151         if (r < 0)
152                 return r;
153
154         bus->sent_hello = true;
155         return r;
156 }
157
158 static int bus_start_running(sd_bus *bus) {
159         assert(bus);
160
161         if (bus->sent_hello) {
162                 bus->state = BUS_HELLO;
163                 return 1;
164         }
165
166         bus->state = BUS_RUNNING;
167         return 1;
168 }
169
170 static int parse_address_key(const char **p, const char *key, char **value) {
171         size_t l, n = 0;
172         const char *a;
173         char *r = NULL;
174
175         assert(p);
176         assert(*p);
177         assert(key);
178         assert(value);
179
180         l = strlen(key);
181         if (strncmp(*p, key, l) != 0)
182                 return 0;
183
184         if ((*p)[l] != '=')
185                 return 0;
186
187         if (*value)
188                 return -EINVAL;
189
190         a = *p + l + 1;
191         while (*a != ',' && *a != 0) {
192                 char c, *t;
193
194                 if (*a == '%') {
195                         int x, y;
196
197                         x = unhexchar(a[1]);
198                         if (x < 0) {
199                                 free(r);
200                                 return x;
201                         }
202
203                         y = unhexchar(a[2]);
204                         if (y < 0) {
205                                 free(r);
206                                 return y;
207                         }
208
209                         c = (char) ((x << 4) | y);
210                         a += 3;
211                 } else {
212                         c = *a;
213                         a++;
214                 }
215
216                 t = realloc(r, n + 2);
217                 if (!t) {
218                         free(r);
219                         return -ENOMEM;
220                 }
221
222                 r = t;
223                 r[n++] = c;
224         }
225
226         if (!r) {
227                 r = strdup("");
228                 if (!r)
229                         return -ENOMEM;
230         } else
231                 r[n] = 0;
232
233         if (*a == ',')
234                 a++;
235
236         *p = a;
237         *value = r;
238         return 1;
239 }
240
241 static void skip_address_key(const char **p) {
242         assert(p);
243         assert(*p);
244
245         *p += strcspn(*p, ",");
246
247         if (**p == ',')
248                 (*p) ++;
249 }
250
251 static int bus_parse_next_address(sd_bus *b) {
252         const char *a, *p;
253         _cleanup_free_ char *guid = NULL;
254         int r;
255
256         assert(b);
257
258         if (!b->address)
259                 return 0;
260         if (b->address[b->address_index] == 0)
261                 return 0;
262
263         a = b->address + b->address_index;
264
265         zero(b->sockaddr);
266         b->sockaddr_size = 0;
267         b->peer = SD_ID128_NULL;
268
269         if (startswith(a, "unix:")) {
270                 _cleanup_free_ char *path = NULL, *abstract = NULL;
271
272                 p = a + 5;
273                 while (*p != 0) {
274                         r = parse_address_key(&p, "guid", &guid);
275                         if (r < 0)
276                                 return r;
277                         else if (r > 0)
278                                 continue;
279
280                         r = parse_address_key(&p, "path", &path);
281                         if (r < 0)
282                                 return r;
283                         else if (r > 0)
284                                 continue;
285
286                         r = parse_address_key(&p, "abstract", &abstract);
287                         if (r < 0)
288                                 return r;
289                         else if (r > 0)
290                                 continue;
291
292                         skip_address_key(&p);
293                 }
294
295                 if (!path && !abstract)
296                         return -EINVAL;
297
298                 if (path && abstract)
299                         return -EINVAL;
300
301                 if (path) {
302                         size_t l;
303
304                         l = strlen(path);
305                         if (l > sizeof(b->sockaddr.un.sun_path))
306                                 return -E2BIG;
307
308                         b->sockaddr.un.sun_family = AF_UNIX;
309                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
310                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
311                 } else if (abstract) {
312                         size_t l;
313
314                         l = strlen(abstract);
315                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
316                                 return -E2BIG;
317
318                         b->sockaddr.un.sun_family = AF_UNIX;
319                         b->sockaddr.un.sun_path[0] = 0;
320                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
321                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
322                 }
323
324         } else if (startswith(a, "tcp:")) {
325                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
326                 struct addrinfo hints, *result;
327
328                 p = a + 4;
329                 while (*p != 0) {
330                         r = parse_address_key(&p, "guid", &guid);
331                         if (r < 0)
332                                 return r;
333                         else if (r > 0)
334                                 continue;
335
336                         r = parse_address_key(&p, "host", &host);
337                         if (r < 0)
338                                 return r;
339                         else if (r > 0)
340                                 continue;
341
342                         r = parse_address_key(&p, "port", &port);
343                         if (r < 0)
344                                 return r;
345                         else if (r > 0)
346                                 continue;
347
348                         r = parse_address_key(&p, "family", &family);
349                         if (r < 0)
350                                 return r;
351                         else if (r > 0)
352                                 continue;
353
354                         skip_address_key(&p);
355                 }
356
357                 if (!host || !port)
358                         return -EINVAL;
359
360                 zero(hints);
361                 hints.ai_socktype = SOCK_STREAM;
362                 hints.ai_flags = AI_ADDRCONFIG;
363
364                 if (family) {
365                         if (streq(family, "ipv4"))
366                                 hints.ai_family = AF_INET;
367                         else if (streq(family, "ipv6"))
368                                 hints.ai_family = AF_INET6;
369                         else
370                                 return -EINVAL;
371                 }
372
373                 r = getaddrinfo(host, port, &hints, &result);
374                 if (r == EAI_SYSTEM)
375                         return -errno;
376                 else if (r != 0)
377                         return -EADDRNOTAVAIL;
378
379                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
380                 b->sockaddr_size = result->ai_addrlen;
381
382                 freeaddrinfo(result);
383         }
384
385         if (guid) {
386                 r = sd_id128_from_string(guid, &b->peer);
387                 if (r < 0)
388                         return r;
389         }
390
391         b->address_index = p - b->address;
392         return 1;
393 }
394
395 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
396
397         while (size > 0) {
398                 struct iovec *i = iov + *idx;
399
400                 if (i->iov_len > size) {
401                         i->iov_base = (uint8_t*) i->iov_base + size;
402                         i->iov_len -= size;
403                         return;
404                 }
405
406                 size -= i->iov_len;
407
408                 i->iov_base = NULL;
409                 i->iov_len = 0;
410
411                 (*idx) ++;
412         }
413 }
414
415 static int bus_write_auth(sd_bus *b) {
416         struct msghdr mh;
417         ssize_t k;
418
419         assert(b);
420         assert(b->state == BUS_AUTHENTICATING);
421
422         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
423                 return 0;
424
425         if (b->auth_timeout == 0)
426                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
427
428         zero(mh);
429         mh.msg_iov = b->auth_iovec + b->auth_index;
430         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
431
432         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
433         if (k < 0)
434                 return errno == EAGAIN ? 0 : -errno;
435
436         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
437
438         return 1;
439 }
440
441 static int bus_auth_verify(sd_bus *b) {
442         char *e, *f;
443         sd_id128_t peer;
444         unsigned i;
445         int r;
446
447         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
448          * that's it */
449
450         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
451         if (!e)
452                 return 0;
453
454         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
455         if (!f)
456                 return 0;
457
458         if (e - (char*) b->rbuffer != 3 + 32)
459                 return -EPERM;
460
461         if (memcmp(b->rbuffer, "OK ", 3))
462                 return -EPERM;
463
464         for (i = 0; i < 32; i += 2) {
465                 int x, y;
466
467                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
468                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
469
470                 if (x < 0 || y < 0)
471                         return -EINVAL;
472
473                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
474         }
475
476         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
477             !sd_id128_equal(b->peer, peer))
478                 return -EPERM;
479
480         b->peer = peer;
481
482         b->can_fds =
483                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
484                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
485
486         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
487         memmove(b->rbuffer, f + 2, b->rbuffer_size);
488
489         r = bus_start_running(b);
490         if (r < 0)
491                 return r;
492
493         return 1;
494 }
495
496 static int bus_read_auth(sd_bus *b) {
497         struct msghdr mh;
498         struct iovec iov;
499         size_t n;
500         ssize_t k;
501         int r;
502         void *p;
503
504         assert(b);
505
506         r = bus_auth_verify(b);
507         if (r != 0)
508                 return r;
509
510         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
511
512         if (n > BUS_AUTH_SIZE_MAX)
513                 n = BUS_AUTH_SIZE_MAX;
514
515         if (b->rbuffer_size >= n)
516                 return -ENOBUFS;
517
518         p = realloc(b->rbuffer, n);
519         if (!p)
520                 return -ENOMEM;
521
522         b->rbuffer = p;
523
524         zero(iov);
525         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
526         iov.iov_len = n - b->rbuffer_size;
527
528         zero(mh);
529         mh.msg_iov = &iov;
530         mh.msg_iovlen = 1;
531
532         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
533         if (k < 0)
534                 return errno == EAGAIN ? 0 : -errno;
535         if (k == 0)
536                 return -ECONNRESET;
537
538         b->rbuffer_size += k;
539
540         r = bus_auth_verify(b);
541         if (r != 0)
542                 return r;
543
544         return 1;
545 }
546
547 static int bus_setup_fd(sd_bus *b) {
548         int one;
549
550         assert(b);
551
552         /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
553          * just in case. This is actually irrelavant for */
554         one = 1;
555         setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
556         setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
557
558         /* Increase the buffers to a MB */
559         fd_inc_rcvbuf(b->fd, 1024*1024);
560         fd_inc_sndbuf(b->fd, 1024*1024);
561
562         return 0;
563 }
564
565 static int bus_start_auth(sd_bus *b) {
566         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
567         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
568
569         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
570         size_t l;
571
572         assert(b);
573
574         b->state = BUS_AUTHENTICATING;
575
576         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
577         char_array_0(text);
578
579         l = strlen(text);
580         b->auth_uid = hexmem(text, l);
581         if (!b->auth_uid)
582                 return -ENOMEM;
583
584         b->auth_iovec[0].iov_base = (void*) auth_prefix;
585         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
586         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
587         b->auth_iovec[1].iov_len = l * 2;
588         b->auth_iovec[2].iov_base = (void*) auth_suffix;
589         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
590         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
591
592         return bus_write_auth(b);
593 }
594
595 static int bus_start_connect(sd_bus *b) {
596         int r;
597
598         assert(b);
599         assert(b->fd < 0);
600
601         for (;;) {
602                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
603                         r = bus_parse_next_address(b);
604                         if (r < 0)
605                                 return r;
606                         if (r == 0)
607                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
608                 }
609
610                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
611                 if (b->fd < 0) {
612                         b->last_connect_error = errno;
613                         goto try_again;
614                 }
615
616                 r = bus_setup_fd(b);
617                 if (r < 0) {
618                         b->last_connect_error = errno;
619                         goto try_again;
620                 }
621
622                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
623                 if (r < 0) {
624                         if (errno == EINPROGRESS)
625                                 return 1;
626
627                         b->last_connect_error = errno;
628                         goto try_again;
629                 }
630
631                 return bus_start_auth(b);
632
633         try_again:
634                 zero(b->sockaddr);
635
636                 if (b->fd >= 0) {
637                         close_nointr_nofail(b->fd);
638                         b->fd = -1;
639                 }
640         }
641 }
642
643 int sd_bus_open_system(sd_bus **ret) {
644         const char *e;
645         sd_bus *b;
646         int r;
647
648         if (!ret)
649                 return -EINVAL;
650
651         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
652         if (e) {
653                 r = sd_bus_open_address(e, &b);
654                 if (r < 0)
655                         return r;
656         } else {
657                 b = bus_new();
658                 if (!b)
659                         return -ENOMEM;
660
661                 b->sockaddr.un.sun_family = AF_UNIX;
662                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
663                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
664
665                 r = bus_start_connect(b);
666                 if (r < 0) {
667                         bus_free(b);
668                         return r;
669                 }
670         }
671
672         r = bus_send_hello(b);
673         if (r < 0) {
674                 sd_bus_unref(b);
675                 return r;
676         }
677
678         *ret = b;
679         return 0;
680 }
681
682 int sd_bus_open_user(sd_bus **ret) {
683         const char *e;
684         sd_bus *b;
685         size_t l;
686         int r;
687
688         if (!ret)
689                 return -EINVAL;
690
691         e = getenv("DBUS_SESSION_BUS_ADDRESS");
692         if (e) {
693                 r = sd_bus_open_address(e, &b);
694                 if (r < 0)
695                         return r;
696         } else {
697                 e = getenv("XDG_RUNTIME_DIR");
698                 if (!e)
699                         return -ENOENT;
700
701                 l = strlen(e);
702                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
703                         return -E2BIG;
704
705                 b = bus_new();
706                 if (!b)
707                         return -ENOMEM;
708
709                 b->sockaddr.un.sun_family = AF_UNIX;
710                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
711                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
712
713                 r = bus_start_connect(b);
714                 if (r < 0) {
715                         bus_free(b);
716                         return r;
717                 }
718         }
719
720         r = bus_send_hello(b);
721         if (r < 0) {
722                 sd_bus_unref(b);
723                 return r;
724         }
725
726         *ret = b;
727         return 0;
728 }
729
730 int sd_bus_open_address(const char *address, sd_bus **ret) {
731         sd_bus *b;
732         int r;
733
734         if (!address)
735                 return -EINVAL;
736         if (!ret)
737                 return -EINVAL;
738
739         b = bus_new();
740         if (!b)
741                 return -ENOMEM;
742
743         b->address = strdup(address);
744         if (!b->address) {
745                 bus_free(b);
746                 return -ENOMEM;
747         }
748
749         r = bus_start_connect(b);
750         if (r < 0) {
751                 bus_free(b);
752                 return r;
753         }
754
755         *ret = b;
756         return 0;
757 }
758
759 int sd_bus_open_fd(int fd, sd_bus **ret) {
760         sd_bus *b;
761         int r;
762
763         if (fd < 0)
764                 return -EINVAL;
765         if (!ret)
766                 return -EINVAL;
767
768         b = bus_new();
769         if (!b)
770                 return -ENOMEM;
771
772         b->fd = fd;
773
774         r = fd_nonblock(b->fd, true);
775         if (r < 0)
776                 goto fail;
777
778         fd_cloexec(b->fd, true);
779         if (r < 0)
780                 goto fail;
781
782         r = bus_setup_fd(b);
783         if (r < 0)
784                 goto fail;
785
786         r = bus_start_auth(b);
787         if (r < 0)
788                 goto fail;
789
790         *ret = b;
791         return 0;
792
793 fail:
794                 bus_free(b);
795         return r;
796 }
797
798 void sd_bus_close(sd_bus *bus) {
799         if (!bus)
800                 return;
801         if (bus->fd < 0)
802                 return;
803
804         close_nointr_nofail(bus->fd);
805         bus->fd = -1;
806 }
807
808 sd_bus *sd_bus_ref(sd_bus *bus) {
809         if (!bus)
810                 return NULL;
811
812         assert(bus->n_ref > 0);
813
814         bus->n_ref++;
815         return bus;
816 }
817
818 sd_bus *sd_bus_unref(sd_bus *bus) {
819         if (!bus)
820                 return NULL;
821
822         assert(bus->n_ref > 0);
823         bus->n_ref--;
824
825         if (bus->n_ref <= 0)
826                 bus_free(bus);
827
828         return NULL;
829 }
830
831 int sd_bus_is_open(sd_bus *bus) {
832         if (!bus)
833                 return -EINVAL;
834
835         return bus->fd >= 0;
836 }
837
838 int sd_bus_can_send(sd_bus *bus, char type) {
839         int r;
840
841         if (!bus)
842                 return -EINVAL;
843
844         if (type == SD_BUS_TYPE_UNIX_FD) {
845                 r = bus_ensure_running(bus);
846                 if (r < 0)
847                         return r;
848
849                 return bus->can_fds;
850         }
851
852         return bus_type_is_valid(type);
853 }
854
855 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
856         int r;
857
858         if (!bus)
859                 return -EINVAL;
860         if (!peer)
861                 return -EINVAL;
862
863         r = bus_ensure_running(bus);
864         if (r < 0)
865                 return r;
866
867         *peer = bus->peer;
868         return 0;
869 }
870
871 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
872         assert(m);
873
874         if (m->header->version > b->message_version)
875                 return -EPERM;
876
877         if (m->sealed)
878                 return 0;
879
880         return bus_message_seal(m, ++b->serial);
881 }
882
883 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
884         struct msghdr mh;
885         struct iovec *iov;
886         ssize_t k;
887         size_t n;
888         unsigned j;
889
890         assert(bus);
891         assert(m);
892         assert(idx);
893         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
894
895         if (*idx >= m->size)
896                 return 0;
897         zero(mh);
898
899         if (m->n_fds > 0) {
900                 struct cmsghdr *control;
901                 control = alloca(CMSG_SPACE(sizeof(int) * m->n_fds));
902
903                 mh.msg_control = control;
904                 control->cmsg_level = SOL_SOCKET;
905                 control->cmsg_type = SCM_RIGHTS;
906                 mh.msg_controllen = control->cmsg_len = CMSG_LEN(sizeof(int) * m->n_fds);
907                 memcpy(CMSG_DATA(control), m->fds, sizeof(int) * m->n_fds);
908         }
909
910         n = m->n_iovec * sizeof(struct iovec);
911         iov = alloca(n);
912         memcpy(iov, m->iovec, n);
913
914         j = 0;
915         iovec_advance(iov, &j, *idx);
916
917         mh.msg_iov = iov;
918         mh.msg_iovlen = m->n_iovec;
919
920         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
921         if (k < 0)
922                 return errno == EAGAIN ? 0 : -errno;
923
924         *idx += (size_t) k;
925         return 1;
926 }
927
928 static int message_read_need(sd_bus *bus, size_t *need) {
929         uint32_t a, b;
930         uint8_t e;
931         uint64_t sum;
932
933         assert(bus);
934         assert(need);
935         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
936
937         if (bus->rbuffer_size < sizeof(struct bus_header)) {
938                 *need = sizeof(struct bus_header) + 8;
939
940                 /* Minimum message size:
941                  *
942                  * Header +
943                  *
944                  *  Method Call: +2 string headers
945                  *       Signal: +3 string headers
946                  * Method Error: +1 string headers
947                  *               +1 uint32 headers
948                  * Method Reply: +1 uint32 headers
949                  *
950                  * A string header is at least 9 bytes
951                  * A uint32 header is at least 8 bytes
952                  *
953                  * Hence the minimum message size of a valid message
954                  * is header + 8 bytes */
955
956                 return 0;
957         }
958
959         a = ((const uint32_t*) bus->rbuffer)[1];
960         b = ((const uint32_t*) bus->rbuffer)[3];
961
962         e = ((const uint8_t*) bus->rbuffer)[0];
963         if (e == SD_BUS_LITTLE_ENDIAN) {
964                 a = le32toh(a);
965                 b = le32toh(b);
966         } else if (e == SD_BUS_BIG_ENDIAN) {
967                 a = be32toh(a);
968                 b = be32toh(b);
969         } else
970                 return -EBADMSG;
971
972         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
973         if (sum >= BUS_MESSAGE_SIZE_MAX)
974                 return -ENOBUFS;
975
976         *need = (size_t) sum;
977         return 0;
978 }
979
980 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
981         sd_bus_message *t;
982         void *b;
983         int r;
984
985         assert(bus);
986         assert(m);
987         assert(bus->rbuffer_size >= size);
988         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
989
990         if (bus->rbuffer_size > size) {
991                 b = memdup((const uint8_t*) bus->rbuffer + size,
992                            bus->rbuffer_size - size);
993                 if (!b)
994                         return -ENOMEM;
995         } else
996                 b = NULL;
997
998         r = bus_message_from_malloc(bus->rbuffer, size,
999                                     bus->fds, bus->n_fds,
1000                                     bus->ucred_valid ? &bus->ucred : NULL,
1001                                     bus->label[0] ? bus->label : NULL,
1002                                     &t);
1003         if (r < 0) {
1004                 free(b);
1005                 return r;
1006         }
1007
1008         bus->rbuffer = b;
1009         bus->rbuffer_size -= size;
1010
1011         bus->fds = NULL;
1012         bus->n_fds = 0;
1013
1014         *m = t;
1015         return 1;
1016 }
1017
1018 static int message_read(sd_bus *bus, sd_bus_message **m) {
1019         struct msghdr mh;
1020         struct iovec iov;
1021         ssize_t k;
1022         size_t need;
1023         int r;
1024         void *b;
1025         union {
1026                 struct cmsghdr cmsghdr;
1027                 uint8_t buf[CMSG_SPACE(sizeof(int) * BUS_FDS_MAX) +
1028                             CMSG_SPACE(sizeof(struct ucred)) +
1029                             CMSG_SPACE(NAME_MAX)]; /*selinux label */
1030         } control;
1031         struct cmsghdr *cmsg;
1032
1033         assert(bus);
1034         assert(m);
1035         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1036
1037         r = message_read_need(bus, &need);
1038         if (r < 0)
1039                 return r;
1040
1041         if (bus->rbuffer_size >= need)
1042                 return message_make(bus, need, m);
1043
1044         b = realloc(bus->rbuffer, need);
1045         if (!b)
1046                 return -ENOMEM;
1047
1048         bus->rbuffer = b;
1049
1050         zero(iov);
1051         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1052         iov.iov_len = need - bus->rbuffer_size;
1053
1054         zero(mh);
1055         mh.msg_iov = &iov;
1056         mh.msg_iovlen = 1;
1057         mh.msg_control = &control;
1058         mh.msg_controllen = sizeof(control);
1059
1060         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1061         if (k < 0)
1062                 return errno == EAGAIN ? 0 : -errno;
1063         if (k == 0)
1064                 return -ECONNRESET;
1065
1066         bus->rbuffer_size += k;
1067
1068         for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1069                 if (cmsg->cmsg_level == SOL_SOCKET &&
1070                     cmsg->cmsg_type == SCM_RIGHTS) {
1071                         int n, *f;
1072
1073                         n = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int);
1074
1075                         f = realloc(bus->fds, sizeof(int) + (bus->n_fds + n));
1076                         if (!f) {
1077                                 close_many((int*) CMSG_DATA(cmsg), n);
1078                                 return -ENOMEM;
1079                         }
1080
1081                         memcpy(f + bus->n_fds, CMSG_DATA(cmsg), n * sizeof(int));
1082                         bus->fds = f;
1083                         bus->n_fds += n;
1084                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1085                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1086                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1087
1088                         memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1089                         bus->ucred_valid = true;
1090
1091                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1092                          cmsg->cmsg_type == SCM_SECURITY) {
1093
1094                         size_t l;
1095                         l = cmsg->cmsg_len - CMSG_LEN(0);
1096                         memcpy(&bus->label, CMSG_DATA(cmsg), l);
1097                         bus->label[l] = 0;
1098                 }
1099         }
1100
1101         r = message_read_need(bus, &need);
1102         if (r < 0)
1103                 return r;
1104
1105         if (bus->rbuffer_size >= need)
1106                 return message_make(bus, need, m);
1107
1108         return 1;
1109 }
1110
1111 static int dispatch_wqueue(sd_bus *bus) {
1112         int r, ret = 0;
1113
1114         assert(bus);
1115         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1116
1117         if (bus->fd < 0)
1118                 return -ENOTCONN;
1119
1120         while (bus->wqueue_size > 0) {
1121
1122                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1123                 if (r < 0) {
1124                         sd_bus_close(bus);
1125                         return r;
1126                 } else if (r == 0)
1127                         /* Didn't do anything this time */
1128                         return ret;
1129                 else if (bus->windex >= bus->wqueue[0]->size) {
1130                         /* Fully written. Let's drop the entry from
1131                          * the queue.
1132                          *
1133                          * This isn't particularly optimized, but
1134                          * well, this is supposed to be our worst-case
1135                          * buffer only, and the socket buffer is
1136                          * supposed to be our primary buffer, and if
1137                          * it got full, then all bets are off
1138                          * anyway. */
1139
1140                         sd_bus_message_unref(bus->wqueue[0]);
1141                         bus->wqueue_size --;
1142                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1143                         bus->windex = 0;
1144
1145                         ret = 1;
1146                 }
1147         }
1148
1149         return ret;
1150 }
1151
1152 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1153         sd_bus_message *z = NULL;
1154         int r, ret = 0;
1155
1156         assert(bus);
1157         assert(m);
1158         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1159
1160         if (bus->fd < 0)
1161                 return -ENOTCONN;
1162
1163         if (bus->rqueue_size > 0) {
1164                 /* Dispatch a queued message */
1165
1166                 *m = bus->rqueue[0];
1167                 bus->rqueue_size --;
1168                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1169                 return 1;
1170         }
1171
1172         /* Try to read a new message */
1173         do {
1174                 r = message_read(bus, &z);
1175                 if (r < 0) {
1176                         sd_bus_close(bus);
1177                         return r;
1178                 }
1179                 if (r == 0)
1180                         return ret;
1181
1182                 r = 1;
1183         } while (!z);
1184
1185         *m = z;
1186         return 1;
1187 }
1188
1189 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1190         int r;
1191
1192         if (!bus)
1193                 return -EINVAL;
1194         if (bus->fd < 0)
1195                 return -ENOTCONN;
1196         if (!m)
1197                 return -EINVAL;
1198         if (m->n_fds > 0 && !bus->can_fds)
1199                 return -ENOTSUP;
1200
1201         /* If the serial number isn't kept, then we know that no reply
1202          * is expected */
1203         if (!serial && !m->sealed)
1204                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1205
1206         r = bus_seal_message(bus, m);
1207         if (r < 0)
1208                 return r;
1209
1210         /* If this is a reply and no reply was requested, then let's
1211          * suppress this, if we can */
1212         if (m->dont_send && !serial)
1213                 return 0;
1214
1215         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1216                 size_t idx = 0;
1217
1218                 r = message_write(bus, m, &idx);
1219                 if (r < 0) {
1220                         sd_bus_close(bus);
1221                         return r;
1222                 } else if (idx < m->size)  {
1223                         /* Wasn't fully written. So let's remember how
1224                          * much was written. Note that the first entry
1225                          * of the wqueue array is always allocated so
1226                          * that we always can remember how much was
1227                          * written. */
1228                         bus->wqueue[0] = sd_bus_message_ref(m);
1229                         bus->wqueue_size = 1;
1230                         bus->windex = idx;
1231                 }
1232         } else {
1233                 sd_bus_message **q;
1234
1235                 /* Just append it to the queue. */
1236
1237                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1238                         return -ENOBUFS;
1239
1240                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1241                 if (!q)
1242                         return -ENOMEM;
1243
1244                 bus->wqueue = q;
1245                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1246         }
1247
1248         if (serial)
1249                 *serial = BUS_MESSAGE_SERIAL(m);
1250
1251         return 0;
1252 }
1253
1254 static usec_t calc_elapse(uint64_t usec) {
1255         if (usec == (uint64_t) -1)
1256                 return 0;
1257
1258         if (usec == 0)
1259                 usec = BUS_DEFAULT_TIMEOUT;
1260
1261         return now(CLOCK_MONOTONIC) + usec;
1262 }
1263
1264 static int timeout_compare(const void *a, const void *b) {
1265         const struct reply_callback *x = a, *y = b;
1266
1267         if (x->timeout != 0 && y->timeout == 0)
1268                 return -1;
1269
1270         if (x->timeout == 0 && y->timeout != 0)
1271                 return 1;
1272
1273         if (x->timeout < y->timeout)
1274                 return -1;
1275
1276         if (x->timeout > y->timeout)
1277                 return 1;
1278
1279         return 0;
1280 }
1281
1282 int sd_bus_send_with_reply(
1283                 sd_bus *bus,
1284                 sd_bus_message *m,
1285                 sd_message_handler_t callback,
1286                 void *userdata,
1287                 uint64_t usec,
1288                 uint64_t *serial) {
1289
1290         struct reply_callback *c;
1291         int r;
1292
1293         if (!bus)
1294                 return -EINVAL;
1295         if (bus->fd < 0)
1296                 return -ENOTCONN;
1297         if (!m)
1298                 return -EINVAL;
1299         if (!callback)
1300                 return -EINVAL;
1301         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1302                 return -EINVAL;
1303         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1304                 return -EINVAL;
1305
1306         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1307         if (r < 0)
1308                 return r;
1309
1310         if (usec != (uint64_t) -1) {
1311                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1312                 if (r < 0)
1313                         return r;
1314         }
1315
1316         r = bus_seal_message(bus, m);
1317         if (r < 0)
1318                 return r;
1319
1320         c = new(struct reply_callback, 1);
1321         if (!c)
1322                 return -ENOMEM;
1323
1324         c->callback = callback;
1325         c->userdata = userdata;
1326         c->serial = BUS_MESSAGE_SERIAL(m);
1327         c->timeout = calc_elapse(usec);
1328
1329         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1330         if (r < 0) {
1331                 free(c);
1332                 return r;
1333         }
1334
1335         if (c->timeout != 0) {
1336                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1337                 if (r < 0) {
1338                         c->timeout = 0;
1339                         sd_bus_send_with_reply_cancel(bus, c->serial);
1340                         return r;
1341                 }
1342         }
1343
1344         r = sd_bus_send(bus, m, serial);
1345         if (r < 0) {
1346                 sd_bus_send_with_reply_cancel(bus, c->serial);
1347                 return r;
1348         }
1349
1350         return r;
1351 }
1352
1353 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1354         struct reply_callback *c;
1355
1356         if (!bus)
1357                 return -EINVAL;
1358         if (serial == 0)
1359                 return -EINVAL;
1360
1361         c = hashmap_remove(bus->reply_callbacks, &serial);
1362         if (!c)
1363                 return 0;
1364
1365         if (c->timeout != 0)
1366                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1367
1368         free(c);
1369         return 1;
1370 }
1371
1372 int bus_ensure_running(sd_bus *bus) {
1373         int r;
1374
1375         assert(bus);
1376
1377         if (bus->state == BUS_RUNNING)
1378                 return 1;
1379
1380         for (;;) {
1381                 r = sd_bus_process(bus, NULL);
1382                 if (r < 0)
1383                         return r;
1384                 if (bus->state == BUS_RUNNING)
1385                         return 1;
1386                 if (r > 0)
1387                         continue;
1388
1389                 r = sd_bus_wait(bus, (uint64_t) -1);
1390                 if (r < 0)
1391                         return r;
1392         }
1393 }
1394
1395 int sd_bus_send_with_reply_and_block(
1396                 sd_bus *bus,
1397                 sd_bus_message *m,
1398                 uint64_t usec,
1399                 sd_bus_error *error,
1400                 sd_bus_message **reply) {
1401
1402         int r;
1403         usec_t timeout;
1404         uint64_t serial;
1405         bool room = false;
1406
1407         if (!bus)
1408                 return -EINVAL;
1409         if (bus->fd < 0)
1410                 return -ENOTCONN;
1411         if (!m)
1412                 return -EINVAL;
1413         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1414                 return -EINVAL;
1415         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1416                 return -EINVAL;
1417         if (bus_error_is_dirty(error))
1418                 return -EINVAL;
1419
1420         r = bus_ensure_running(bus);
1421         if (r < 0)
1422                 return r;
1423
1424         r = sd_bus_send(bus, m, &serial);
1425         if (r < 0)
1426                 return r;
1427
1428         timeout = calc_elapse(usec);
1429
1430         for (;;) {
1431                 usec_t left;
1432                 sd_bus_message *incoming = NULL;
1433
1434                 if (!room) {
1435                         sd_bus_message **q;
1436
1437                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1438                                 return -ENOBUFS;
1439
1440                         /* Make sure there's room for queuing this
1441                          * locally, before we read the message */
1442
1443                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1444                         if (!q)
1445                                 return -ENOMEM;
1446
1447                         bus->rqueue = q;
1448                         room = true;
1449                 }
1450
1451                 r = message_read(bus, &incoming);
1452                 if (r < 0)
1453                         return r;
1454                 if (incoming) {
1455
1456                         if (incoming->reply_serial == serial) {
1457                                 /* Found a match! */
1458
1459                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1460                                         *reply = incoming;
1461                                         return 0;
1462                                 }
1463
1464                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1465                                         int k;
1466
1467                                         r = sd_bus_error_copy(error, &incoming->error);
1468                                         if (r < 0) {
1469                                                 sd_bus_message_unref(incoming);
1470                                                 return r;
1471                                         }
1472
1473                                         k = bus_error_to_errno(&incoming->error);
1474                                         sd_bus_message_unref(incoming);
1475                                         return k;
1476                                 }
1477
1478                                 sd_bus_message_unref(incoming);
1479                                 return -EIO;
1480                         }
1481
1482                         /* There's already guaranteed to be room for
1483                          * this, so need to resize things here */
1484                         bus->rqueue[bus->rqueue_size ++] = incoming;
1485                         room = false;
1486
1487                         /* Try to read more, right-away */
1488                         continue;
1489                 }
1490                 if (r != 0)
1491                         continue;
1492
1493                 if (timeout > 0) {
1494                         usec_t n;
1495
1496                         n = now(CLOCK_MONOTONIC);
1497                         if (n >= timeout)
1498                                 return -ETIMEDOUT;
1499
1500                         left = timeout - n;
1501                 } else
1502                         left = (uint64_t) -1;
1503
1504                 r = bus_poll(bus, true, left);
1505                 if (r < 0)
1506                         return r;
1507
1508                 r = dispatch_wqueue(bus);
1509                 if (r < 0)
1510                         return r;
1511         }
1512 }
1513
1514 int sd_bus_get_fd(sd_bus *bus) {
1515         if (!bus)
1516                 return -EINVAL;
1517
1518         if (bus->fd < 0)
1519                 return -ENOTCONN;
1520
1521         return bus->fd;
1522 }
1523
1524 int sd_bus_get_events(sd_bus *bus) {
1525         int flags = 0;
1526
1527         if (!bus)
1528                 return -EINVAL;
1529         if (bus->fd < 0)
1530                 return -ENOTCONN;
1531
1532         if (bus->state == BUS_OPENING)
1533                 flags |= POLLOUT;
1534         else if (bus->state == BUS_AUTHENTICATING) {
1535
1536                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1537                         flags |= POLLOUT;
1538
1539                 flags |= POLLIN;
1540
1541         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1542                 if (bus->rqueue_size <= 0)
1543                         flags |= POLLIN;
1544                 if (bus->wqueue_size > 0)
1545                         flags |= POLLOUT;
1546         }
1547
1548         return flags;
1549 }
1550
1551 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1552         struct reply_callback *c;
1553
1554         if (!bus)
1555                 return -EINVAL;
1556         if (!timeout_usec)
1557                 return -EINVAL;
1558         if (bus->fd < 0)
1559                 return -ENOTCONN;
1560
1561         if (bus->state == BUS_AUTHENTICATING) {
1562                 *timeout_usec = bus->auth_timeout;
1563                 return 1;
1564         }
1565
1566         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1567                 return 0;
1568
1569         c = prioq_peek(bus->reply_callbacks_prioq);
1570         if (!c)
1571                 return 0;
1572
1573         *timeout_usec = c->timeout;
1574         return 1;
1575 }
1576
1577 static int process_timeout(sd_bus *bus) {
1578         struct reply_callback *c;
1579         usec_t n;
1580         int r;
1581
1582         assert(bus);
1583
1584         c = prioq_peek(bus->reply_callbacks_prioq);
1585         if (!c)
1586                 return 0;
1587
1588         n = now(CLOCK_MONOTONIC);
1589         if (c->timeout > n)
1590                 return 0;
1591
1592         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1593         hashmap_remove(bus->reply_callbacks, &c->serial);
1594
1595         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1596         free(c);
1597
1598         return r < 0 ? r : 1;
1599 }
1600
1601 static int process_reply(sd_bus *bus, sd_bus_message *m) {
1602         struct reply_callback *c;
1603         int r;
1604
1605         assert(bus);
1606         assert(m);
1607
1608         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_RETURN &&
1609             m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_ERROR)
1610                 return 0;
1611
1612         c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1613         if (!c)
1614                 return 0;
1615
1616         if (c->timeout != 0)
1617                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1618
1619         r = c->callback(bus, 0, m, c->userdata);
1620         free(c);
1621
1622         return r;
1623 }
1624
1625 static int process_filter(sd_bus *bus, sd_bus_message *m) {
1626         struct filter_callback *l;
1627         int r;
1628
1629         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1630                 r = l->callback(bus, 0, m, l->userdata);
1631                 if (r != 0)
1632                         return r;
1633         }
1634
1635         return 0;
1636 }
1637
1638 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1639         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1640         int r;
1641
1642         assert(bus);
1643         assert(m);
1644
1645         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1646                 return 0;
1647
1648         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1649                 return 0;
1650
1651         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1652                 return 1;
1653
1654         if (streq_ptr(m->member, "Ping"))
1655                 r = sd_bus_message_new_method_return(bus, m, &reply);
1656         else if (streq_ptr(m->member, "GetMachineId")) {
1657                 sd_id128_t id;
1658                 char sid[33];
1659
1660                 r = sd_id128_get_machine(&id);
1661                 if (r < 0)
1662                         return r;
1663
1664                 r = sd_bus_message_new_method_return(bus, m, &reply);
1665                 if (r < 0)
1666                         return r;
1667
1668                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1669         } else {
1670                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1671
1672                 sd_bus_error_set(&error,
1673                                  "org.freedesktop.DBus.Error.UnknownMethod",
1674                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1675
1676                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1677         }
1678
1679         if (r < 0)
1680                 return r;
1681
1682         r = sd_bus_send(bus, reply, NULL);
1683         if (r < 0)
1684                 return r;
1685
1686         return 1;
1687 }
1688
1689 static int process_object(sd_bus *bus, sd_bus_message *m) {
1690         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1691         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1692         struct object_callback *c;
1693         char *p;
1694         int r;
1695         bool found = false;
1696
1697         assert(bus);
1698         assert(m);
1699
1700         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1701                 return 0;
1702
1703         if (hashmap_isempty(bus->object_callbacks))
1704                 return 0;
1705
1706         c = hashmap_get(bus->object_callbacks, m->path);
1707         if (c) {
1708                 r = c->callback(bus, 0, m, c->userdata);
1709                 if (r != 0)
1710                         return r;
1711
1712                 found = true;
1713         }
1714
1715         /* Look for fallback prefixes */
1716         p = strdupa(m->path);
1717         for (;;) {
1718                 char *e;
1719
1720                 e = strrchr(p, '/');
1721                 if (e == p || !e)
1722                         break;
1723
1724                 *e = 0;
1725
1726                 c = hashmap_get(bus->object_callbacks, p);
1727                 if (c && c->is_fallback) {
1728                         r = c->callback(bus, 0, m, c->userdata);
1729                         if (r != 0)
1730                                 return r;
1731
1732                         found = true;
1733                 }
1734         }
1735
1736         if (!found)
1737                 return 0;
1738
1739         sd_bus_error_set(&error,
1740                          "org.freedesktop.DBus.Error.UnknownMethod",
1741                          "Unknown method '%s' or interface '%s'.", m->member, m->interface);
1742
1743         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1744         if (r < 0)
1745                 return r;
1746
1747         r = sd_bus_send(bus, reply, NULL);
1748         if (r < 0)
1749                 return r;
1750
1751         return 1;
1752 }
1753
1754 static int process_message(sd_bus *bus, sd_bus_message *m) {
1755         int r;
1756
1757         assert(bus);
1758         assert(m);
1759
1760         r = process_reply(bus, m);
1761         if (r != 0)
1762                 return r;
1763
1764         r = process_filter(bus, m);
1765         if (r != 0)
1766                 return r;
1767
1768         r = process_builtin(bus, m);
1769         if (r != 0)
1770                 return r;
1771
1772         return process_object(bus, m);
1773 }
1774
1775 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1776         int r;
1777
1778         /* Returns 0 when we didn't do anything. This should cause the
1779          * caller to invoke sd_bus_wait() before returning the next
1780          * time. Returns > 0 when we did something, which possibly
1781          * means *ret is filled in with an unprocessed message. */
1782
1783         if (!bus)
1784                 return -EINVAL;
1785         if (bus->fd < 0)
1786                 return -ENOTCONN;
1787
1788         if (bus->state == BUS_OPENING) {
1789                 struct pollfd p;
1790
1791                 zero(p);
1792                 p.fd = bus->fd;
1793                 p.events = POLLOUT;
1794
1795                 r = poll(&p, 1, 0);
1796                 if (r < 0)
1797                         return -errno;
1798
1799                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1800                         int error = 0;
1801                         socklen_t slen = sizeof(error);
1802
1803                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1804                         if (r < 0)
1805                                 bus->last_connect_error = errno;
1806                         else if (error != 0)
1807                                 bus->last_connect_error = error;
1808                         else if (p.revents & (POLLERR|POLLHUP))
1809                                 bus->last_connect_error = ECONNREFUSED;
1810                         else {
1811                                 r = bus_start_auth(bus);
1812                                 goto null_message;
1813                         }
1814
1815                         /* Try next address */
1816                         r = bus_start_connect(bus);
1817                         goto null_message;
1818                 }
1819
1820                 r = 0;
1821                 goto null_message;
1822
1823         } else if (bus->state == BUS_AUTHENTICATING) {
1824
1825                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1826                         return -ETIMEDOUT;
1827
1828                 r = bus_write_auth(bus);
1829                 if (r != 0)
1830                         goto null_message;
1831
1832                 r = bus_read_auth(bus);
1833                 goto null_message;
1834
1835         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1836                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1837                 int k;
1838
1839                 r = process_timeout(bus);
1840                 if (r != 0)
1841                         goto null_message;
1842
1843                 r = dispatch_wqueue(bus);
1844                 if (r != 0)
1845                         goto null_message;
1846
1847                 k = r;
1848                 r = dispatch_rqueue(bus, &m);
1849                 if (r < 0)
1850                         return r;
1851                 if (!m) {
1852                         if (r == 0)
1853                                 r = k;
1854                         goto null_message;
1855                 }
1856
1857                 r = process_message(bus, m);
1858                 if (r != 0)
1859                         goto null_message;
1860
1861                 if (ret) {
1862                         *ret = m;
1863                         m = NULL;
1864                         return 1;
1865                 }
1866
1867                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1868                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1869                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1870
1871                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1872
1873                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1874                         if (r < 0)
1875                                 return r;
1876
1877                         r = sd_bus_send(bus, reply, NULL);
1878                         if (r < 0)
1879                                 return r;
1880                 }
1881
1882                 return 1;
1883         }
1884
1885         assert_not_reached("Unknown state");
1886
1887 null_message:
1888         if (r >= 0 && ret)
1889                 *ret = NULL;
1890
1891         return r;
1892 }
1893
1894 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1895         struct pollfd p;
1896         int r, e;
1897         struct timespec ts;
1898         usec_t until, m;
1899
1900         assert(bus);
1901
1902         if (bus->fd < 0)
1903                 return -ENOTCONN;
1904
1905         e = sd_bus_get_events(bus);
1906         if (e < 0)
1907                 return e;
1908
1909         if (need_more)
1910                 e |= POLLIN;
1911
1912         r = sd_bus_get_timeout(bus, &until);
1913         if (r < 0)
1914                 return r;
1915         if (r == 0)
1916                 m = (uint64_t) -1;
1917         else {
1918                 usec_t n;
1919                 n = now(CLOCK_MONOTONIC);
1920                 m = until > n ? until - n : 0;
1921         }
1922
1923         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1924                 m = timeout_usec;
1925
1926         zero(p);
1927         p.fd = bus->fd;
1928         p.events = e;
1929
1930         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1931         if (r < 0)
1932                 return -errno;
1933
1934         return r > 0 ? 1 : 0;
1935 }
1936
1937 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1938
1939         if (!bus)
1940                 return -EINVAL;
1941         if (bus->fd < 0)
1942                 return -ENOTCONN;
1943         if (bus->rqueue_size > 0)
1944                 return 0;
1945
1946         return bus_poll(bus, false, timeout_usec);
1947 }
1948
1949 int sd_bus_flush(sd_bus *bus) {
1950         int r;
1951
1952         if (!bus)
1953                 return -EINVAL;
1954         if (bus->fd < 0)
1955                 return -ENOTCONN;
1956
1957         r = bus_ensure_running(bus);
1958         if (r < 0)
1959                 return r;
1960
1961         if (bus->wqueue_size <= 0)
1962                 return 0;
1963
1964         for (;;) {
1965                 r = dispatch_wqueue(bus);
1966                 if (r < 0)
1967                         return r;
1968
1969                 if (bus->wqueue_size <= 0)
1970                         return 0;
1971
1972                 r = bus_poll(bus, false, (uint64_t) -1);
1973                 if (r < 0)
1974                         return r;
1975         }
1976 }
1977
1978 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1979         struct filter_callback *f;
1980
1981         if (!bus)
1982                 return -EINVAL;
1983         if (!callback)
1984                 return -EINVAL;
1985
1986         f = new(struct filter_callback, 1);
1987         if (!f)
1988                 return -ENOMEM;
1989         f->callback = callback;
1990         f->userdata = userdata;
1991
1992         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1993         return 0;
1994 }
1995
1996 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1997         struct filter_callback *f;
1998
1999         if (!bus)
2000                 return -EINVAL;
2001         if (!callback)
2002                 return -EINVAL;
2003
2004         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
2005                 if (f->callback == callback && f->userdata == userdata) {
2006                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
2007                         free(f);
2008                         return 1;
2009                 }
2010         }
2011
2012         return 0;
2013 }
2014
2015 static int bus_add_object(
2016                 sd_bus *bus,
2017                 bool fallback,
2018                 const char *path,
2019                 sd_message_handler_t callback,
2020                 void *userdata) {
2021
2022         struct object_callback *c;
2023         int r;
2024
2025         if (!bus)
2026                 return -EINVAL;
2027         if (!path)
2028                 return -EINVAL;
2029         if (!callback)
2030                 return -EINVAL;
2031
2032         r = hashmap_ensure_allocated(&bus->object_callbacks, string_hash_func, string_compare_func);
2033         if (r < 0)
2034                 return r;
2035
2036         c = new(struct object_callback, 1);
2037         if (!c)
2038                 return -ENOMEM;
2039
2040         c->path = strdup(path);
2041         if (!path) {
2042                 free(c);
2043                 return -ENOMEM;
2044         }
2045
2046         c->callback = callback;
2047         c->userdata = userdata;
2048         c->is_fallback = fallback;
2049
2050         r = hashmap_put(bus->object_callbacks, c->path, c);
2051         if (r < 0) {
2052                 free(c->path);
2053                 free(c);
2054                 return r;
2055         }
2056
2057         return 0;
2058 }
2059
2060 static int bus_remove_object(
2061                 sd_bus *bus,
2062                 bool fallback,
2063                 const char *path,
2064                 sd_message_handler_t callback,
2065                 void *userdata) {
2066
2067         struct object_callback *c;
2068
2069         if (!bus)
2070                 return -EINVAL;
2071         if (!path)
2072                 return -EINVAL;
2073         if (!callback)
2074                 return -EINVAL;
2075
2076         c = hashmap_get(bus->object_callbacks, path);
2077         if (!c)
2078                 return 0;
2079
2080         if (c->callback != callback || c->userdata != userdata || c->is_fallback != fallback)
2081                 return 0;
2082
2083         assert_se(c == hashmap_remove(bus->object_callbacks, c->path));
2084
2085         free(c->path);
2086         free(c);
2087
2088         return 1;
2089 }
2090
2091 int sd_bus_add_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2092         return bus_add_object(bus, false, path, callback, userdata);
2093 }
2094
2095 int sd_bus_remove_object(sd_bus *bus, const char *path, sd_message_handler_t callback, void *userdata) {
2096         return bus_remove_object(bus, false, path, callback, userdata);
2097 }
2098
2099 int sd_bus_add_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2100         return bus_add_object(bus, true, prefix, callback, userdata);
2101 }
2102
2103 int sd_bus_remove_fallback(sd_bus *bus, const char *prefix, sd_message_handler_t callback, void *userdata) {
2104         return bus_remove_object(bus, true, prefix, callback, userdata);
2105 }