chiark / gitweb /
73774ba308c67430160892ab17bf4f7931426a87
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32 #include "missing.h"
33
34 #include "sd-bus.h"
35 #include "bus-internal.h"
36 #include "bus-message.h"
37 #include "bus-type.h"
38
39 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
40
41 static void bus_free(sd_bus *b) {
42         struct filter_callback *f;
43         unsigned i;
44
45         assert(b);
46
47         if (b->fd >= 0)
48                 close_nointr_nofail(b->fd);
49
50         free(b->rbuffer);
51         free(b->unique_name);
52         free(b->auth_uid);
53         free(b->address);
54
55         for (i = 0; i < b->rqueue_size; i++)
56                 sd_bus_message_unref(b->rqueue[i]);
57         free(b->rqueue);
58
59         for (i = 0; i < b->wqueue_size; i++)
60                 sd_bus_message_unref(b->wqueue[i]);
61         free(b->wqueue);
62
63         hashmap_free_free(b->reply_callbacks);
64         prioq_free(b->reply_callbacks_prioq);
65
66         while ((f = b->filter_callbacks)) {
67                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
68                 free(f);
69         }
70
71         free(b);
72 }
73
74 static sd_bus* bus_new(void) {
75         sd_bus *r;
76
77         r = new0(sd_bus, 1);
78         if (!r)
79                 return NULL;
80
81         r->n_ref = 1;
82         r->fd = -1;
83         r->message_version = 1;
84
85         /* We guarantee that wqueue always has space for at least one
86          * entry */
87         r->wqueue = new(sd_bus_message*, 1);
88         if (!r->wqueue) {
89                 free(r);
90                 return NULL;
91         }
92
93         return r;
94 };
95
96 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
97         const char *s;
98         int r;
99
100         assert(bus);
101
102         if (error != 0)
103                 return -error;
104
105         assert(reply);
106
107         r = sd_bus_message_read(reply, "s", &s);
108         if (r < 0)
109                 return r;
110
111         if (!service_name_is_valid(s) || s[0] != ':')
112                 return -EBADMSG;
113
114         bus->unique_name = strdup(s);
115         if (!bus->unique_name)
116                 return -ENOMEM;
117
118         bus->state = BUS_RUNNING;
119
120         return 1;
121 }
122
123 static int bus_send_hello(sd_bus *bus) {
124         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
125         int r;
126
127         assert(bus);
128
129         r = sd_bus_message_new_method_call(
130                         bus,
131                         "org.freedesktop.DBus",
132                         "/",
133                         "org.freedesktop.DBus",
134                         "Hello",
135                         &m);
136         if (r < 0)
137                 return r;
138
139         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
140         if (r < 0)
141                 return r;
142
143         bus->sent_hello = true;
144         return r;
145 }
146
147 static int bus_start_running(sd_bus *bus) {
148         assert(bus);
149
150         if (bus->sent_hello) {
151                 bus->state = BUS_HELLO;
152                 return 1;
153         }
154
155         bus->state = BUS_RUNNING;
156         return 1;
157 }
158
159 static int parse_address_key(const char **p, const char *key, char **value) {
160         size_t l, n = 0;
161         const char *a;
162         char *r = NULL;
163
164         assert(p);
165         assert(*p);
166         assert(key);
167         assert(value);
168
169         l = strlen(key);
170         if (strncmp(*p, key, l) != 0)
171                 return 0;
172
173         if ((*p)[l] != '=')
174                 return 0;
175
176         if (*value)
177                 return -EINVAL;
178
179         a = *p + l + 1;
180         while (*a != ',' && *a != 0) {
181                 char c, *t;
182
183                 if (*a == '%') {
184                         int x, y;
185
186                         x = unhexchar(a[1]);
187                         if (x < 0) {
188                                 free(r);
189                                 return x;
190                         }
191
192                         y = unhexchar(a[2]);
193                         if (y < 0) {
194                                 free(r);
195                                 return y;
196                         }
197
198                         c = (char) ((x << 4) | y);
199                         a += 3;
200                 } else {
201                         c = *a;
202                         a++;
203                 }
204
205                 t = realloc(r, n + 2);
206                 if (!t) {
207                         free(r);
208                         return -ENOMEM;
209                 }
210
211                 r = t;
212                 r[n++] = c;
213         }
214
215         if (!r) {
216                 r = strdup("");
217                 if (!r)
218                         return -ENOMEM;
219         } else
220                 r[n] = 0;
221
222         if (*a == ',')
223                 a++;
224
225         *p = a;
226         *value = r;
227         return 1;
228 }
229
230 static void skip_address_key(const char **p) {
231         assert(p);
232         assert(*p);
233
234         *p += strcspn(*p, ",");
235
236         if (**p == ',')
237                 (*p) ++;
238 }
239
240 static int bus_parse_next_address(sd_bus *b) {
241         const char *a, *p;
242         _cleanup_free_ char *guid = NULL;
243         int r;
244
245         assert(b);
246
247         if (!b->address)
248                 return 0;
249         if (b->address[b->address_index] == 0)
250                 return 0;
251
252         a = b->address + b->address_index;
253
254         zero(b->sockaddr);
255         b->sockaddr_size = 0;
256         b->peer = SD_ID128_NULL;
257
258         if (startswith(a, "unix:")) {
259                 _cleanup_free_ char *path = NULL, *abstract = NULL;
260
261                 p = a + 5;
262                 while (*p != 0) {
263                         r = parse_address_key(&p, "guid", &guid);
264                         if (r < 0)
265                                 return r;
266                         else if (r > 0)
267                                 continue;
268
269                         r = parse_address_key(&p, "path", &path);
270                         if (r < 0)
271                                 return r;
272                         else if (r > 0)
273                                 continue;
274
275                         r = parse_address_key(&p, "abstract", &abstract);
276                         if (r < 0)
277                                 return r;
278                         else if (r > 0)
279                                 continue;
280
281                         skip_address_key(&p);
282                 }
283
284                 if (!path && !abstract)
285                         return -EINVAL;
286
287                 if (path && abstract)
288                         return -EINVAL;
289
290                 if (path) {
291                         size_t l;
292
293                         l = strlen(path);
294                         if (l > sizeof(b->sockaddr.un.sun_path))
295                                 return -E2BIG;
296
297                         b->sockaddr.un.sun_family = AF_UNIX;
298                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
299                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
300                 } else if (abstract) {
301                         size_t l;
302
303                         l = strlen(abstract);
304                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
305                                 return -E2BIG;
306
307                         b->sockaddr.un.sun_family = AF_UNIX;
308                         b->sockaddr.un.sun_path[0] = 0;
309                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
310                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
311                 }
312
313         } else if (startswith(a, "tcp:")) {
314                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
315                 struct addrinfo hints, *result;
316
317                 p = a + 4;
318                 while (*p != 0) {
319                         r = parse_address_key(&p, "guid", &guid);
320                         if (r < 0)
321                                 return r;
322                         else if (r > 0)
323                                 continue;
324
325                         r = parse_address_key(&p, "host", &host);
326                         if (r < 0)
327                                 return r;
328                         else if (r > 0)
329                                 continue;
330
331                         r = parse_address_key(&p, "port", &port);
332                         if (r < 0)
333                                 return r;
334                         else if (r > 0)
335                                 continue;
336
337                         r = parse_address_key(&p, "family", &family);
338                         if (r < 0)
339                                 return r;
340                         else if (r > 0)
341                                 continue;
342
343                         skip_address_key(&p);
344                 }
345
346                 if (!host || !port)
347                         return -EINVAL;
348
349                 zero(hints);
350                 hints.ai_socktype = SOCK_STREAM;
351                 hints.ai_flags = AI_ADDRCONFIG;
352
353                 if (family) {
354                         if (streq(family, "ipv4"))
355                                 hints.ai_family = AF_INET;
356                         else if (streq(family, "ipv6"))
357                                 hints.ai_family = AF_INET6;
358                         else
359                                 return -EINVAL;
360                 }
361
362                 r = getaddrinfo(host, port, &hints, &result);
363                 if (r == EAI_SYSTEM)
364                         return -errno;
365                 else if (r != 0)
366                         return -EADDRNOTAVAIL;
367
368                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
369                 b->sockaddr_size = result->ai_addrlen;
370
371                 freeaddrinfo(result);
372         }
373
374         if (guid) {
375                 r = sd_id128_from_string(guid, &b->peer);
376                 if (r < 0)
377                         return r;
378         }
379
380         b->address_index = p - b->address;
381         return 1;
382 }
383
384 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
385
386         while (size > 0) {
387                 struct iovec *i = iov + *idx;
388
389                 if (i->iov_len > size) {
390                         i->iov_base = (uint8_t*) i->iov_base + size;
391                         i->iov_len -= size;
392                         return;
393                 }
394
395                 size -= i->iov_len;
396
397                 i->iov_base = NULL;
398                 i->iov_len = 0;
399
400                 (*idx) ++;
401         }
402 }
403
404 static int bus_write_auth(sd_bus *b) {
405         struct msghdr mh;
406         ssize_t k;
407
408         assert(b);
409         assert(b->state == BUS_AUTHENTICATING);
410
411         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
412                 return 0;
413
414         if (b->auth_timeout == 0)
415                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
416
417         zero(mh);
418         mh.msg_iov = b->auth_iovec + b->auth_index;
419         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
420
421         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
422         if (k < 0)
423                 return errno == EAGAIN ? 0 : -errno;
424
425         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
426
427         return 1;
428 }
429
430 static int bus_auth_verify(sd_bus *b) {
431         char *e, *f;
432         sd_id128_t peer;
433         unsigned i;
434         int r;
435
436         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
437          * that's it */
438
439         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
440         if (!e)
441                 return 0;
442
443         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
444         if (!f)
445                 return 0;
446
447         if (e - (char*) b->rbuffer != 3 + 32)
448                 return -EPERM;
449
450         if (memcmp(b->rbuffer, "OK ", 3))
451                 return -EPERM;
452
453         for (i = 0; i < 32; i += 2) {
454                 int x, y;
455
456                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
457                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
458
459                 if (x < 0 || y < 0)
460                         return -EINVAL;
461
462                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
463         }
464
465         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
466             !sd_id128_equal(b->peer, peer))
467                 return -EPERM;
468
469         b->peer = peer;
470
471         b->can_fds =
472                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
473                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
474
475         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
476         memmove(b->rbuffer, f + 2, b->rbuffer_size);
477
478         r = bus_start_running(b);
479         if (r < 0)
480                 return r;
481
482         return 1;
483 }
484
485 static int bus_read_auth(sd_bus *b) {
486         struct msghdr mh;
487         struct iovec iov;
488         size_t n;
489         ssize_t k;
490         int r;
491         void *p;
492
493         assert(b);
494
495         r = bus_auth_verify(b);
496         if (r != 0)
497                 return r;
498
499         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
500
501         if (n > BUS_AUTH_SIZE_MAX)
502                 n = BUS_AUTH_SIZE_MAX;
503
504         if (b->rbuffer_size >= n)
505                 return -ENOBUFS;
506
507         p = realloc(b->rbuffer, n);
508         if (!p)
509                 return -ENOMEM;
510
511         b->rbuffer = p;
512
513         zero(iov);
514         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
515         iov.iov_len = n - b->rbuffer_size;
516
517         zero(mh);
518         mh.msg_iov = &iov;
519         mh.msg_iovlen = 1;
520
521         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
522         if (k < 0)
523                 return errno == EAGAIN ? 0 : -errno;
524
525         b->rbuffer_size += k;
526
527         r = bus_auth_verify(b);
528         if (r != 0)
529                 return r;
530
531         return 1;
532 }
533
534 static int bus_setup_fd(sd_bus *b) {
535         int one;
536
537         assert(b);
538
539         /* Enable SO_PASSCRED + SO_PASSEC. We try this on any socket,
540          * just in case. This is actually irrelavant for */
541         one = 1;
542         setsockopt(b->fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
543         setsockopt(b->fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one));
544
545         /* Increase the buffers to a MB */
546         fd_inc_rcvbuf(b->fd, 1024*1024);
547         fd_inc_sndbuf(b->fd, 1024*1024);
548
549         return 0;
550 }
551
552 static int bus_start_auth(sd_bus *b) {
553         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
554         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
555
556         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
557         size_t l;
558
559         assert(b);
560
561         b->state = BUS_AUTHENTICATING;
562
563         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
564         char_array_0(text);
565
566         l = strlen(text);
567         b->auth_uid = hexmem(text, l);
568         if (!b->auth_uid)
569                 return -ENOMEM;
570
571         b->auth_iovec[0].iov_base = (void*) auth_prefix;
572         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
573         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
574         b->auth_iovec[1].iov_len = l * 2;
575         b->auth_iovec[2].iov_base = (void*) auth_suffix;
576         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
577         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
578
579         return bus_write_auth(b);
580 }
581
582 static int bus_start_connect(sd_bus *b) {
583         int r;
584
585         assert(b);
586         assert(b->fd < 0);
587
588         for (;;) {
589                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
590                         r = bus_parse_next_address(b);
591                         if (r < 0)
592                                 return r;
593                         if (r == 0)
594                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
595                 }
596
597                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
598                 if (b->fd < 0) {
599                         b->last_connect_error = errno;
600                         goto try_again;
601                 }
602
603                 r = bus_setup_fd(b);
604                 if (r < 0) {
605                         b->last_connect_error = errno;
606                         goto try_again;
607                 }
608
609                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
610                 if (r < 0) {
611                         if (errno == EINPROGRESS)
612                                 return 1;
613
614                         b->last_connect_error = errno;
615                         goto try_again;
616                 }
617
618                 return bus_start_auth(b);
619
620         try_again:
621                 zero(b->sockaddr);
622
623                 if (b->fd >= 0) {
624                         close_nointr_nofail(b->fd);
625                         b->fd = -1;
626                 }
627         }
628 }
629
630 int sd_bus_open_system(sd_bus **ret) {
631         const char *e;
632         sd_bus *b;
633         int r;
634
635         if (!ret)
636                 return -EINVAL;
637
638         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
639         if (e) {
640                 r = sd_bus_open_address(e, &b);
641                 if (r < 0)
642                         return r;
643         } else {
644                 b = bus_new();
645                 if (!b)
646                         return -ENOMEM;
647
648                 b->sockaddr.un.sun_family = AF_UNIX;
649                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
650                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
651
652                 r = bus_start_connect(b);
653                 if (r < 0) {
654                         bus_free(b);
655                         return r;
656                 }
657         }
658
659         r = bus_send_hello(b);
660         if (r < 0) {
661                 sd_bus_unref(b);
662                 return r;
663         }
664
665         *ret = b;
666         return 0;
667 }
668
669 int sd_bus_open_user(sd_bus **ret) {
670         const char *e;
671         sd_bus *b;
672         size_t l;
673         int r;
674
675         if (!ret)
676                 return -EINVAL;
677
678         e = getenv("DBUS_SESSION_BUS_ADDRESS");
679         if (e) {
680                 r = sd_bus_open_address(e, &b);
681                 if (r < 0)
682                         return r;
683         } else {
684                 e = getenv("XDG_RUNTIME_DIR");
685                 if (!e)
686                         return -ENOENT;
687
688                 l = strlen(e);
689                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
690                         return -E2BIG;
691
692                 b = bus_new();
693                 if (!b)
694                         return -ENOMEM;
695
696                 b->sockaddr.un.sun_family = AF_UNIX;
697                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
698                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
699
700                 r = bus_start_connect(b);
701                 if (r < 0) {
702                         bus_free(b);
703                         return r;
704                 }
705         }
706
707         r = bus_send_hello(b);
708         if (r < 0) {
709                 sd_bus_unref(b);
710                 return r;
711         }
712
713         *ret = b;
714         return 0;
715 }
716
717 int sd_bus_open_address(const char *address, sd_bus **ret) {
718         sd_bus *b;
719         int r;
720
721         if (!address)
722                 return -EINVAL;
723         if (!ret)
724                 return -EINVAL;
725
726         b = bus_new();
727         if (!b)
728                 return -ENOMEM;
729
730         b->address = strdup(address);
731         if (!b->address) {
732                 bus_free(b);
733                 return -ENOMEM;
734         }
735
736         r = bus_start_connect(b);
737         if (r < 0) {
738                 bus_free(b);
739                 return r;
740         }
741
742         *ret = b;
743         return 0;
744 }
745
746 int sd_bus_open_fd(int fd, sd_bus **ret) {
747         sd_bus *b;
748         int r;
749
750         if (fd < 0)
751                 return -EINVAL;
752         if (!ret)
753                 return -EINVAL;
754
755         b = bus_new();
756         if (!b)
757                 return -ENOMEM;
758
759         b->fd = fd;
760
761         r = fd_nonblock(b->fd, true);
762         if (r < 0)
763                 goto fail;
764
765         fd_cloexec(b->fd, true);
766         if (r < 0)
767                 goto fail;
768
769         r = bus_setup_fd(b);
770         if (r < 0)
771                 goto fail;
772
773         r = bus_start_auth(b);
774         if (r < 0)
775                 goto fail;
776
777         *ret = b;
778         return 0;
779
780 fail:
781                 bus_free(b);
782         return r;
783 }
784
785 void sd_bus_close(sd_bus *bus) {
786         if (!bus)
787                 return;
788         if (bus->fd < 0)
789                 return;
790
791         close_nointr_nofail(bus->fd);
792         bus->fd = -1;
793 }
794
795 sd_bus *sd_bus_ref(sd_bus *bus) {
796         if (!bus)
797                 return NULL;
798
799         assert(bus->n_ref > 0);
800
801         bus->n_ref++;
802         return bus;
803 }
804
805 sd_bus *sd_bus_unref(sd_bus *bus) {
806         if (!bus)
807                 return NULL;
808
809         assert(bus->n_ref > 0);
810         bus->n_ref--;
811
812         if (bus->n_ref <= 0)
813                 bus_free(bus);
814
815         return NULL;
816 }
817
818 int sd_bus_is_open(sd_bus *bus) {
819         if (!bus)
820                 return -EINVAL;
821
822         return bus->fd >= 0;
823 }
824
825 int sd_bus_can_send(sd_bus *bus, char type) {
826         int r;
827
828         if (!bus)
829                 return -EINVAL;
830
831         if (type == SD_BUS_TYPE_UNIX_FD) {
832                 r = bus_ensure_running(bus);
833                 if (r < 0)
834                         return r;
835
836                 return bus->can_fds;
837         }
838
839         return bus_type_is_valid(type);
840 }
841
842 int sd_bus_get_peer(sd_bus *bus, sd_id128_t *peer) {
843         int r;
844
845         if (!bus)
846                 return -EINVAL;
847         if (!peer)
848                 return -EINVAL;
849
850         r = bus_ensure_running(bus);
851         if (r < 0)
852                 return r;
853
854         *peer = bus->peer;
855         return 0;
856 }
857
858 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
859         assert(m);
860
861         if (m->header->version > b->message_version)
862                 return -EPERM;
863
864         if (m->sealed)
865                 return 0;
866
867         return bus_message_seal(m, ++b->serial);
868 }
869
870 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
871         struct msghdr mh;
872         struct iovec *iov;
873         ssize_t k;
874         size_t n;
875         unsigned j;
876
877         assert(bus);
878         assert(m);
879         assert(idx);
880         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
881
882         if (*idx >= m->size)
883                 return 0;
884
885         n = m->n_iovec * sizeof(struct iovec);
886         iov = alloca(n);
887         memcpy(iov, m->iovec, n);
888
889         j = 0;
890         iovec_advance(iov, &j, *idx);
891
892         zero(mh);
893         mh.msg_iov = iov;
894         mh.msg_iovlen = m->n_iovec;
895
896         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
897         if (k < 0)
898                 return errno == EAGAIN ? 0 : -errno;
899
900         *idx += (size_t) k;
901         return 1;
902 }
903
904 static int message_read_need(sd_bus *bus, size_t *need) {
905         uint32_t a, b;
906         uint8_t e;
907         uint64_t sum;
908
909         assert(bus);
910         assert(need);
911         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
912
913         if (bus->rbuffer_size < sizeof(struct bus_header)) {
914                 *need = sizeof(struct bus_header) + 8;
915
916                 /* Minimum message size:
917                  *
918                  * Header +
919                  *
920                  *  Method Call: +2 string headers
921                  *       Signal: +3 string headers
922                  * Method Error: +1 string headers
923                  *               +1 uint32 headers
924                  * Method Reply: +1 uint32 headers
925                  *
926                  * A string header is at least 9 bytes
927                  * A uint32 header is at least 8 bytes
928                  *
929                  * Hence the minimum message size of a valid message
930                  * is header + 8 bytes */
931
932                 return 0;
933         }
934
935         a = ((const uint32_t*) bus->rbuffer)[1];
936         b = ((const uint32_t*) bus->rbuffer)[3];
937
938         e = ((const uint8_t*) bus->rbuffer)[0];
939         if (e == SD_BUS_LITTLE_ENDIAN) {
940                 a = le32toh(a);
941                 b = le32toh(b);
942         } else if (e == SD_BUS_BIG_ENDIAN) {
943                 a = be32toh(a);
944                 b = be32toh(b);
945         } else
946                 return -EBADMSG;
947
948         sum = (uint64_t) sizeof(struct bus_header) + (uint64_t) ALIGN_TO(b, 8) + (uint64_t) a;
949         if (sum >= BUS_MESSAGE_SIZE_MAX)
950                 return -ENOBUFS;
951
952         *need = (size_t) sum;
953         return 0;
954 }
955
956 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
957         sd_bus_message *t;
958         void *b = NULL;
959         int r;
960
961         assert(bus);
962         assert(m);
963         assert(bus->rbuffer_size >= size);
964         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
965
966         if (bus->rbuffer_size > size) {
967                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
968                 if (!b) {
969                         free(t);
970                         return -ENOMEM;
971                 }
972         }
973
974         r = bus_message_from_malloc(bus->rbuffer, size,
975                                     bus->ucred_valid ? &bus->ucred : NULL,
976                                     bus->label[0] ? bus->label : NULL, &t);
977         if (r < 0) {
978                 free(b);
979                 return r;
980         }
981
982         bus->rbuffer = b;
983         bus->rbuffer_size -= size;
984
985         *m = t;
986         return 1;
987 }
988
989 static int message_read(sd_bus *bus, sd_bus_message **m) {
990         struct msghdr mh;
991         struct iovec iov;
992         ssize_t k;
993         size_t need;
994         int r;
995         void *b;
996         union {
997                 struct cmsghdr cmsghdr;
998                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
999                             CMSG_SPACE(NAME_MAX)]; /*selinux label */
1000         } control;
1001         struct cmsghdr *cmsg;
1002
1003         assert(bus);
1004         assert(m);
1005         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1006
1007         r = message_read_need(bus, &need);
1008         if (r < 0)
1009                 return r;
1010
1011         if (bus->rbuffer_size >= need)
1012                 return message_make(bus, need, m);
1013
1014         b = realloc(bus->rbuffer, need);
1015         if (!b)
1016                 return -ENOMEM;
1017
1018         bus->rbuffer = b;
1019
1020         zero(iov);
1021         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
1022         iov.iov_len = need - bus->rbuffer_size;
1023
1024         zero(mh);
1025         mh.msg_iov = &iov;
1026         mh.msg_iovlen = 1;
1027         mh.msg_control = &control;
1028         mh.msg_controllen = sizeof(control);
1029
1030         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL|MSG_CMSG_CLOEXEC);
1031         if (k < 0)
1032                 return errno == EAGAIN ? 0 : -errno;
1033
1034         bus->rbuffer_size += k;
1035         bus->ucred_valid = false;
1036         bus->label[0] = 0;
1037
1038         for (cmsg = CMSG_FIRSTHDR(&mh); cmsg; cmsg = CMSG_NXTHDR(&mh, cmsg)) {
1039                 if (cmsg->cmsg_level == SOL_SOCKET &&
1040                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1041                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) {
1042
1043                         memcpy(&bus->ucred, CMSG_DATA(cmsg), sizeof(struct ucred));
1044                         bus->ucred_valid = true;
1045
1046                 } else if (cmsg->cmsg_level == SOL_SOCKET &&
1047                          cmsg->cmsg_type == SCM_SECURITY) {
1048
1049                         size_t l;
1050                         l = cmsg->cmsg_len - CMSG_LEN(0);
1051                         memcpy(&bus->label, CMSG_DATA(cmsg), l);
1052                         bus->label[l] = 0;
1053                 }
1054         }
1055
1056         r = message_read_need(bus, &need);
1057         if (r < 0)
1058                 return r;
1059
1060         if (bus->rbuffer_size >= need)
1061                 return message_make(bus, need, m);
1062
1063         return 1;
1064 }
1065
1066 static int dispatch_wqueue(sd_bus *bus) {
1067         int r, ret = 0;
1068
1069         assert(bus);
1070         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1071
1072         if (bus->fd < 0)
1073                 return -ENOTCONN;
1074
1075         while (bus->wqueue_size > 0) {
1076
1077                 r = message_write(bus, bus->wqueue[0], &bus->windex);
1078                 if (r < 0) {
1079                         sd_bus_close(bus);
1080                         return r;
1081                 } else if (r == 0)
1082                         /* Didn't do anything this time */
1083                         return ret;
1084                 else if (bus->windex >= bus->wqueue[0]->size) {
1085                         /* Fully written. Let's drop the entry from
1086                          * the queue.
1087                          *
1088                          * This isn't particularly optimized, but
1089                          * well, this is supposed to be our worst-case
1090                          * buffer only, and the socket buffer is
1091                          * supposed to be our primary buffer, and if
1092                          * it got full, then all bets are off
1093                          * anyway. */
1094
1095                         sd_bus_message_unref(bus->wqueue[0]);
1096                         bus->wqueue_size --;
1097                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1098                         bus->windex = 0;
1099
1100                         ret = 1;
1101                 }
1102         }
1103
1104         return ret;
1105 }
1106
1107 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1108         sd_bus_message *z = NULL;
1109         int r, ret = 0;
1110
1111         assert(bus);
1112         assert(m);
1113         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1114
1115         if (bus->fd < 0)
1116                 return -ENOTCONN;
1117
1118         if (bus->rqueue_size > 0) {
1119                 /* Dispatch a queued message */
1120
1121                 *m = bus->rqueue[0];
1122                 bus->rqueue_size --;
1123                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1124                 return 1;
1125         }
1126
1127         /* Try to read a new message */
1128         do {
1129                 r = message_read(bus, &z);
1130                 if (r < 0) {
1131                         sd_bus_close(bus);
1132                         return r;
1133                 }
1134                 if (r == 0)
1135                         return ret;
1136
1137                 r = 1;
1138         } while (!z);
1139
1140         *m = z;
1141         return 1;
1142 }
1143
1144 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1145         int r;
1146
1147         if (!bus)
1148                 return -EINVAL;
1149         if (bus->fd < 0)
1150                 return -ENOTCONN;
1151         if (!m)
1152                 return -EINVAL;
1153
1154         /* If the serial number isn't kept, then we know that no reply
1155          * is expected */
1156         if (!serial && !m->sealed)
1157                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1158
1159         r = bus_seal_message(bus, m);
1160         if (r < 0)
1161                 return r;
1162
1163         /* If this is a reply and no reply was requested, then let's
1164          * suppress this, if we can */
1165         if (m->dont_send && !serial)
1166                 return 0;
1167
1168         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1169                 size_t idx = 0;
1170
1171                 r = message_write(bus, m, &idx);
1172                 if (r < 0) {
1173                         sd_bus_close(bus);
1174                         return r;
1175                 } else if (idx < m->size)  {
1176                         /* Wasn't fully written. So let's remember how
1177                          * much was written. Note that the first entry
1178                          * of the wqueue array is always allocated so
1179                          * that we always can remember how much was
1180                          * written. */
1181                         bus->wqueue[0] = sd_bus_message_ref(m);
1182                         bus->wqueue_size = 1;
1183                         bus->windex = idx;
1184                 }
1185         } else {
1186                 sd_bus_message **q;
1187
1188                 /* Just append it to the queue. */
1189
1190                 if (bus->wqueue_size >= BUS_WQUEUE_MAX)
1191                         return -ENOBUFS;
1192
1193                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1194                 if (!q)
1195                         return -ENOMEM;
1196
1197                 bus->wqueue = q;
1198                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1199         }
1200
1201         if (serial)
1202                 *serial = BUS_MESSAGE_SERIAL(m);
1203
1204         return 0;
1205 }
1206
1207 static usec_t calc_elapse(uint64_t usec) {
1208         if (usec == (uint64_t) -1)
1209                 return 0;
1210
1211         if (usec == 0)
1212                 usec = BUS_DEFAULT_TIMEOUT;
1213
1214         return now(CLOCK_MONOTONIC) + usec;
1215 }
1216
1217 static int timeout_compare(const void *a, const void *b) {
1218         const struct reply_callback *x = a, *y = b;
1219
1220         if (x->timeout != 0 && y->timeout == 0)
1221                 return -1;
1222
1223         if (x->timeout == 0 && y->timeout != 0)
1224                 return 1;
1225
1226         if (x->timeout < y->timeout)
1227                 return -1;
1228
1229         if (x->timeout > y->timeout)
1230                 return 1;
1231
1232         return 0;
1233 }
1234
1235 int sd_bus_send_with_reply(
1236                 sd_bus *bus,
1237                 sd_bus_message *m,
1238                 sd_message_handler_t callback,
1239                 void *userdata,
1240                 uint64_t usec,
1241                 uint64_t *serial) {
1242
1243         struct reply_callback *c;
1244         int r;
1245
1246         if (!bus)
1247                 return -EINVAL;
1248         if (bus->fd < 0)
1249                 return -ENOTCONN;
1250         if (!m)
1251                 return -EINVAL;
1252         if (!callback)
1253                 return -EINVAL;
1254         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1255                 return -EINVAL;
1256         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1257                 return -EINVAL;
1258
1259         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1260         if (r < 0)
1261                 return r;
1262
1263         if (usec != (uint64_t) -1) {
1264                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1265                 if (r < 0)
1266                         return r;
1267         }
1268
1269         r = bus_seal_message(bus, m);
1270         if (r < 0)
1271                 return r;
1272
1273         c = new(struct reply_callback, 1);
1274         if (!c)
1275                 return -ENOMEM;
1276
1277         c->callback = callback;
1278         c->userdata = userdata;
1279         c->serial = BUS_MESSAGE_SERIAL(m);
1280         c->timeout = calc_elapse(usec);
1281
1282         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1283         if (r < 0) {
1284                 free(c);
1285                 return r;
1286         }
1287
1288         if (c->timeout != 0) {
1289                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1290                 if (r < 0) {
1291                         c->timeout = 0;
1292                         sd_bus_send_with_reply_cancel(bus, c->serial);
1293                         return r;
1294                 }
1295         }
1296
1297         r = sd_bus_send(bus, m, serial);
1298         if (r < 0) {
1299                 sd_bus_send_with_reply_cancel(bus, c->serial);
1300                 return r;
1301         }
1302
1303         return r;
1304 }
1305
1306 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1307         struct reply_callback *c;
1308
1309         if (!bus)
1310                 return -EINVAL;
1311         if (serial == 0)
1312                 return -EINVAL;
1313
1314         c = hashmap_remove(bus->reply_callbacks, &serial);
1315         if (!c)
1316                 return 0;
1317
1318         if (c->timeout != 0)
1319                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1320
1321         free(c);
1322         return 1;
1323 }
1324
1325 int bus_ensure_running(sd_bus *bus) {
1326         int r;
1327
1328         assert(bus);
1329
1330         if (bus->state == BUS_RUNNING)
1331                 return 1;
1332
1333         for (;;) {
1334                 r = sd_bus_process(bus, NULL);
1335                 if (r < 0)
1336                         return r;
1337                 if (bus->state == BUS_RUNNING)
1338                         return 1;
1339                 if (r > 0)
1340                         continue;
1341
1342                 r = sd_bus_wait(bus, (uint64_t) -1);
1343                 if (r < 0)
1344                         return r;
1345         }
1346 }
1347
1348 int sd_bus_send_with_reply_and_block(
1349                 sd_bus *bus,
1350                 sd_bus_message *m,
1351                 uint64_t usec,
1352                 sd_bus_error *error,
1353                 sd_bus_message **reply) {
1354
1355         int r;
1356         usec_t timeout;
1357         uint64_t serial;
1358         bool room = false;
1359
1360         if (!bus)
1361                 return -EINVAL;
1362         if (bus->fd < 0)
1363                 return -ENOTCONN;
1364         if (!m)
1365                 return -EINVAL;
1366         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1367                 return -EINVAL;
1368         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1369                 return -EINVAL;
1370         if (bus_error_is_dirty(error))
1371                 return -EINVAL;
1372
1373         r = bus_ensure_running(bus);
1374         if (r < 0)
1375                 return r;
1376
1377         r = sd_bus_send(bus, m, &serial);
1378         if (r < 0)
1379                 return r;
1380
1381         timeout = calc_elapse(usec);
1382
1383         for (;;) {
1384                 usec_t left;
1385                 sd_bus_message *incoming = NULL;
1386
1387                 if (!room) {
1388                         sd_bus_message **q;
1389
1390                         if (bus->rqueue_size >= BUS_RQUEUE_MAX)
1391                                 return -ENOBUFS;
1392
1393                         /* Make sure there's room for queuing this
1394                          * locally, before we read the message */
1395
1396                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1397                         if (!q)
1398                                 return -ENOMEM;
1399
1400                         bus->rqueue = q;
1401                         room = true;
1402                 }
1403
1404                 r = message_read(bus, &incoming);
1405                 if (r < 0)
1406                         return r;
1407                 if (incoming) {
1408
1409                         if (incoming->reply_serial == serial) {
1410                                 /* Found a match! */
1411
1412                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1413                                         *reply = incoming;
1414                                         return 0;
1415                                 }
1416
1417                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1418                                         int k;
1419
1420                                         r = sd_bus_error_copy(error, &incoming->error);
1421                                         if (r < 0) {
1422                                                 sd_bus_message_unref(incoming);
1423                                                 return r;
1424                                         }
1425
1426                                         k = bus_error_to_errno(&incoming->error);
1427                                         sd_bus_message_unref(incoming);
1428                                         return k;
1429                                 }
1430
1431                                 sd_bus_message_unref(incoming);
1432                                 return -EIO;
1433                         }
1434
1435                         /* There's already guaranteed to be room for
1436                          * this, so need to resize things here */
1437                         bus->rqueue[bus->rqueue_size ++] = incoming;
1438                         room = false;
1439
1440                         /* Try to read more, right-away */
1441                         continue;
1442                 }
1443                 if (r != 0)
1444                         continue;
1445
1446                 if (timeout > 0) {
1447                         usec_t n;
1448
1449                         n = now(CLOCK_MONOTONIC);
1450                         if (n >= timeout)
1451                                 return -ETIMEDOUT;
1452
1453                         left = timeout - n;
1454                 } else
1455                         left = (uint64_t) -1;
1456
1457                 r = bus_poll(bus, true, left);
1458                 if (r < 0)
1459                         return r;
1460
1461                 r = dispatch_wqueue(bus);
1462                 if (r < 0)
1463                         return r;
1464         }
1465 }
1466
1467 int sd_bus_get_fd(sd_bus *bus) {
1468         if (!bus)
1469                 return -EINVAL;
1470
1471         if (bus->fd < 0)
1472                 return -ENOTCONN;
1473
1474         return bus->fd;
1475 }
1476
1477 int sd_bus_get_events(sd_bus *bus) {
1478         int flags = 0;
1479
1480         if (!bus)
1481                 return -EINVAL;
1482         if (bus->fd < 0)
1483                 return -ENOTCONN;
1484
1485         if (bus->state == BUS_OPENING)
1486                 flags |= POLLOUT;
1487         else if (bus->state == BUS_AUTHENTICATING) {
1488
1489                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1490                         flags |= POLLOUT;
1491
1492                 flags |= POLLIN;
1493
1494         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1495                 if (bus->rqueue_size <= 0)
1496                         flags |= POLLIN;
1497                 if (bus->wqueue_size > 0)
1498                         flags |= POLLOUT;
1499         }
1500
1501         return flags;
1502 }
1503
1504 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1505         struct reply_callback *c;
1506
1507         if (!bus)
1508                 return -EINVAL;
1509         if (!timeout_usec)
1510                 return -EINVAL;
1511         if (bus->fd < 0)
1512                 return -ENOTCONN;
1513
1514         if (bus->state == BUS_AUTHENTICATING) {
1515                 *timeout_usec = bus->auth_timeout;
1516                 return 1;
1517         }
1518
1519         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1520                 return 0;
1521
1522         c = prioq_peek(bus->reply_callbacks_prioq);
1523         if (!c)
1524                 return 0;
1525
1526         *timeout_usec = c->timeout;
1527         return 1;
1528 }
1529
1530 static int process_timeout(sd_bus *bus) {
1531         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1532         struct reply_callback *c;
1533         usec_t n;
1534         int r;
1535
1536         assert(bus);
1537
1538         c = prioq_peek(bus->reply_callbacks_prioq);
1539         if (!c)
1540                 return 0;
1541
1542         n = now(CLOCK_MONOTONIC);
1543         if (c->timeout > n)
1544                 return 0;
1545
1546         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1547         hashmap_remove(bus->reply_callbacks, &c->serial);
1548
1549         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1550         free(c);
1551
1552         return r < 0 ? r : 1;
1553 }
1554
1555 static int process_builtin(sd_bus *bus, sd_bus_message *m) {
1556         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1557         int r;
1558
1559         assert(bus);
1560         assert(m);
1561
1562         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1563                 return 0;
1564
1565         if (!streq_ptr(m->interface, "org.freedesktop.DBus.Peer"))
1566                 return 0;
1567
1568         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1569                 return 1;
1570
1571         if (streq_ptr(m->member, "Ping"))
1572                 r = sd_bus_message_new_method_return(bus, m, &reply);
1573         else if (streq_ptr(m->member, "GetMachineId")) {
1574                 sd_id128_t id;
1575                 char sid[33];
1576
1577                 r = sd_id128_get_machine(&id);
1578                 if (r < 0)
1579                         return r;
1580
1581                 r = sd_bus_message_new_method_return(bus, m, &reply);
1582                 if (r < 0)
1583                         return r;
1584
1585                 r = sd_bus_message_append(reply, "s", sd_id128_to_string(id, sid));
1586         } else {
1587                 _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1588
1589                 sd_bus_error_set(&error,
1590                                  "org.freedesktop.DBus.Error.UnknownMethod",
1591                                  "Unknown method '%s' on interface '%s'.", m->member, m->interface);
1592
1593                 r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1594         }
1595
1596         if (r < 0)
1597                 return r;
1598
1599         r = sd_bus_send(bus, reply, NULL);
1600         if (r < 0)
1601                 return r;
1602
1603         return 1;
1604 }
1605
1606 static int process_message(sd_bus *bus, sd_bus_message *m) {
1607         struct filter_callback *l;
1608         int r;
1609
1610         assert(bus);
1611         assert(m);
1612
1613         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1614                 struct reply_callback *c;
1615
1616                 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1617                 if (c) {
1618                         if (c->timeout != 0)
1619                                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1620
1621                         r = c->callback(bus, 0, m, c->userdata);
1622                         free(c);
1623
1624                         if (r != 0)
1625                                 return r;
1626                 }
1627         }
1628
1629         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1630                 r = l->callback(bus, 0, m, l->userdata);
1631                 if (r != 0)
1632                         return r;
1633         }
1634
1635         return process_builtin(bus, m);
1636 }
1637
1638 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1639         int r;
1640
1641         /* Returns 0 when we didn't do anything. This should cause the
1642          * caller to invoke sd_bus_wait() before returning the next
1643          * time. Returns > 0 when we did something, which possibly
1644          * means *ret is filled in with an unprocessed message. */
1645
1646         if (!bus)
1647                 return -EINVAL;
1648         if (bus->fd < 0)
1649                 return -ENOTCONN;
1650
1651         if (bus->state == BUS_OPENING) {
1652                 struct pollfd p;
1653
1654                 zero(p);
1655                 p.fd = bus->fd;
1656                 p.events = POLLOUT;
1657
1658                 r = poll(&p, 1, 0);
1659                 if (r < 0)
1660                         return -errno;
1661
1662                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1663                         int error = 0;
1664                         socklen_t slen = sizeof(error);
1665
1666                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1667                         if (r < 0)
1668                                 bus->last_connect_error = errno;
1669                         else if (error != 0)
1670                                 bus->last_connect_error = error;
1671                         else if (p.revents & (POLLERR|POLLHUP))
1672                                 bus->last_connect_error = ECONNREFUSED;
1673                         else {
1674                                 r = bus_start_auth(bus);
1675                                 goto null_message;
1676                         }
1677
1678                         /* Try next address */
1679                         r = bus_start_connect(bus);
1680                         goto null_message;
1681                 }
1682
1683                 r = 0;
1684                 goto null_message;
1685
1686         } else if (bus->state == BUS_AUTHENTICATING) {
1687
1688                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1689                         return -ETIMEDOUT;
1690
1691                 r = bus_write_auth(bus);
1692                 if (r != 0)
1693                         goto null_message;
1694
1695                 r = bus_read_auth(bus);
1696                 goto null_message;
1697
1698         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1699                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1700                 int k;
1701
1702                 r = process_timeout(bus);
1703                 if (r != 0)
1704                         goto null_message;
1705
1706                 r = dispatch_wqueue(bus);
1707                 if (r != 0)
1708                         goto null_message;
1709
1710                 k = r;
1711                 r = dispatch_rqueue(bus, &m);
1712                 if (r < 0)
1713                         return r;
1714                 if (!m) {
1715                         if (r == 0)
1716                                 r = k;
1717                         goto null_message;
1718                 }
1719
1720                 r = process_message(bus, m);
1721                 if (r != 0)
1722                         goto null_message;
1723
1724                 if (ret) {
1725                         *ret = m;
1726                         m = NULL;
1727                         return 1;
1728                 }
1729
1730                 if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL) {
1731                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1732                         _cleanup_bus_error_free_ sd_bus_error error = SD_BUS_ERROR_INIT;
1733
1734                         sd_bus_error_set(&error, "org.freedesktop.DBus.Error.UnknownObject", "Unknown object '%s'.", m->path);
1735
1736                         r = sd_bus_message_new_method_error(bus, m, &error, &reply);
1737                         if (r < 0)
1738                                 return r;
1739
1740                         r = sd_bus_send(bus, reply, NULL);
1741                         if (r < 0)
1742                                 return r;
1743                 }
1744
1745                 return 1;
1746         }
1747
1748         assert_not_reached("Unknown state");
1749
1750 null_message:
1751         if (r >= 0 && ret)
1752                 *ret = NULL;
1753
1754         return r;
1755 }
1756
1757 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1758         struct pollfd p;
1759         int r, e;
1760         struct timespec ts;
1761         usec_t until, m;
1762
1763         assert(bus);
1764
1765         if (bus->fd < 0)
1766                 return -ENOTCONN;
1767
1768         e = sd_bus_get_events(bus);
1769         if (e < 0)
1770                 return e;
1771
1772         if (need_more)
1773                 e |= POLLIN;
1774
1775         r = sd_bus_get_timeout(bus, &until);
1776         if (r < 0)
1777                 return r;
1778         if (r == 0)
1779                 m = (uint64_t) -1;
1780         else {
1781                 usec_t n;
1782                 n = now(CLOCK_MONOTONIC);
1783                 m = until > n ? until - n : 0;
1784         }
1785
1786         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1787                 m = timeout_usec;
1788
1789         zero(p);
1790         p.fd = bus->fd;
1791         p.events = e;
1792
1793         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1794         if (r < 0)
1795                 return -errno;
1796
1797         return r > 0 ? 1 : 0;
1798 }
1799
1800 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1801
1802         if (!bus)
1803                 return -EINVAL;
1804         if (bus->fd < 0)
1805                 return -ENOTCONN;
1806         if (bus->rqueue_size > 0)
1807                 return 0;
1808
1809         return bus_poll(bus, false, timeout_usec);
1810 }
1811
1812 int sd_bus_flush(sd_bus *bus) {
1813         int r;
1814
1815         if (!bus)
1816                 return -EINVAL;
1817         if (bus->fd < 0)
1818                 return -ENOTCONN;
1819
1820         r = bus_ensure_running(bus);
1821         if (r < 0)
1822                 return r;
1823
1824         if (bus->wqueue_size <= 0)
1825                 return 0;
1826
1827         for (;;) {
1828                 r = dispatch_wqueue(bus);
1829                 if (r < 0)
1830                         return r;
1831
1832                 if (bus->wqueue_size <= 0)
1833                         return 0;
1834
1835                 r = bus_poll(bus, false, (uint64_t) -1);
1836                 if (r < 0)
1837                         return r;
1838         }
1839 }
1840
1841 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1842         struct filter_callback *f;
1843
1844         if (!bus)
1845                 return -EINVAL;
1846         if (!callback)
1847                 return -EINVAL;
1848
1849         f = new(struct filter_callback, 1);
1850         if (!f)
1851                 return -ENOMEM;
1852         f->callback = callback;
1853         f->userdata = userdata;
1854
1855         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1856         return 0;
1857 }
1858
1859 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1860         struct filter_callback *f;
1861
1862         if (!bus)
1863                 return -EINVAL;
1864         if (!callback)
1865                 return -EINVAL;
1866
1867         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1868                 if (f->callback == callback && f->userdata == userdata) {
1869                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1870                         free(f);
1871                         return 1;
1872                 }
1873         }
1874
1875         return 0;
1876 }