chiark / gitweb /
bus: implement full method call timeout logic
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 #define WQUEUE_MAX 128
39
40 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41
42 static void bus_free(sd_bus *b) {
43         struct filter_callback *f;
44         unsigned i;
45
46         assert(b);
47
48         if (b->fd >= 0)
49                 close_nointr_nofail(b->fd);
50
51         free(b->rbuffer);
52         free(b->unique_name);
53         free(b->auth_uid);
54         free(b->address);
55
56         for (i = 0; i < b->rqueue_size; i++)
57                 sd_bus_message_unref(b->rqueue[i]);
58         free(b->rqueue);
59
60         for (i = 0; i < b->wqueue_size; i++)
61                 sd_bus_message_unref(b->wqueue[i]);
62         free(b->wqueue);
63
64         hashmap_free_free(b->reply_callbacks);
65         prioq_free(b->reply_callbacks_prioq);
66
67         while ((f = b->filter_callbacks)) {
68                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
69                 free(f);
70         }
71
72         free(b);
73 }
74
75 static sd_bus* bus_new(void) {
76         sd_bus *r;
77
78         r = new0(sd_bus, 1);
79         if (!r)
80                 return NULL;
81
82         r->n_ref = 1;
83         r->fd = -1;
84         r->message_version = 1;
85
86         /* We guarantee that wqueue always has space for at least one
87          * entry */
88         r->wqueue = new(sd_bus_message*, 1);
89         if (!r->wqueue) {
90                 free(r);
91                 return NULL;
92         }
93
94         return r;
95 };
96
97 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
98         const char *s;
99         int r;
100
101         assert(bus);
102
103         if (error != 0)
104                 return -error;
105
106         assert(reply);
107
108         bus->state = BUS_RUNNING;
109
110         r = sd_bus_message_read(reply, "s", &s);
111         if (r < 0)
112                 return r;
113
114         bus->unique_name = strdup(s);
115         if (!bus->unique_name)
116                 return -ENOMEM;
117
118         return 1;
119 }
120
121 static int bus_send_hello(sd_bus *bus) {
122         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
123         int r;
124
125         assert(bus);
126
127         r = sd_bus_message_new_method_call(
128                         bus,
129                         "org.freedesktop.DBus",
130                         "/",
131                         "org.freedesktop.DBus",
132                         "Hello",
133                         &m);
134         if (r < 0)
135                 return r;
136
137         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
138         if (r < 0)
139                 return r;
140
141         bus->sent_hello = true;
142         return r;
143 }
144
145 static int bus_start_running(sd_bus *bus) {
146         assert(bus);
147
148         if (bus->sent_hello) {
149                 bus->state = BUS_HELLO;
150                 return 1;
151         }
152
153         bus->state = BUS_RUNNING;
154         return 1;
155 }
156
157 static int parse_address_key(const char **p, const char *key, char **value) {
158         size_t l, n = 0;
159         const char *a;
160         char *r = NULL;
161
162         assert(p);
163         assert(*p);
164         assert(key);
165         assert(value);
166
167         l = strlen(key);
168         if (strncmp(*p, key, l) != 0)
169                 return 0;
170
171         if ((*p)[l] != '=')
172                 return 0;
173
174         if (*value)
175                 return -EINVAL;
176
177         a = *p + l + 1;
178         while (*a != ',' && *a != 0) {
179                 char c, *t;
180
181                 if (*a == '%') {
182                         int x, y;
183
184                         x = unhexchar(a[1]);
185                         if (x < 0) {
186                                 free(r);
187                                 return x;
188                         }
189
190                         y = unhexchar(a[2]);
191                         if (y < 0) {
192                                 free(r);
193                                 return y;
194                         }
195
196                         c = (char) ((x << 4) | y);
197                         a += 3;
198                 } else {
199                         c = *a;
200                         a++;
201                 }
202
203                 t = realloc(r, n + 2);
204                 if (!t) {
205                         free(r);
206                         return -ENOMEM;
207                 }
208
209                 r = t;
210                 r[n++] = c;
211         }
212
213         if (!r) {
214                 r = strdup("");
215                 if (!r)
216                         return -ENOMEM;
217         } else
218                 r[n] = 0;
219
220         if (*a == ',')
221                 a++;
222
223         *p = a;
224         *value = r;
225         return 1;
226 }
227
228 static void skip_address_key(const char **p) {
229         assert(p);
230         assert(*p);
231
232         *p += strcspn(*p, ",");
233
234         if (**p == ',')
235                 (*p) ++;
236 }
237
238 static int bus_parse_next_address(sd_bus *b) {
239         const char *a, *p;
240         _cleanup_free_ char *guid = NULL;
241         int r;
242
243         assert(b);
244
245         if (!b->address)
246                 return 0;
247         if (b->address[b->address_index] == 0)
248                 return 0;
249
250         a = b->address + b->address_index;
251
252         zero(b->sockaddr);
253         b->sockaddr_size = 0;
254         b->peer = SD_ID128_NULL;
255
256         if (startswith(a, "unix:")) {
257                 _cleanup_free_ char *path = NULL, *abstract = NULL;
258
259                 p = a + 5;
260                 while (*p != 0) {
261                         r = parse_address_key(&p, "guid", &guid);
262                         if (r < 0)
263                                 return r;
264                         else if (r > 0)
265                                 continue;
266
267                         r = parse_address_key(&p, "path", &path);
268                         if (r < 0)
269                                 return r;
270                         else if (r > 0)
271                                 continue;
272
273                         r = parse_address_key(&p, "abstract", &abstract);
274                         if (r < 0)
275                                 return r;
276                         else if (r > 0)
277                                 continue;
278
279                         skip_address_key(&p);
280                 }
281
282                 if (!path && !abstract)
283                         return -EINVAL;
284
285                 if (path && abstract)
286                         return -EINVAL;
287
288                 if (path) {
289                         size_t l;
290
291                         l = strlen(path);
292                         if (l > sizeof(b->sockaddr.un.sun_path))
293                                 return -E2BIG;
294
295                         b->sockaddr.un.sun_family = AF_UNIX;
296                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
297                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
298                 } else if (abstract) {
299                         size_t l;
300
301                         l = strlen(abstract);
302                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
303                                 return -E2BIG;
304
305                         b->sockaddr.un.sun_family = AF_UNIX;
306                         b->sockaddr.un.sun_path[0] = 0;
307                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
308                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
309                 }
310
311         } else if (startswith(a, "tcp:")) {
312                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
313                 struct addrinfo hints, *result;
314
315                 p = a + 4;
316                 while (*p != 0) {
317                         r = parse_address_key(&p, "guid", &guid);
318                         if (r < 0)
319                                 return r;
320                         else if (r > 0)
321                                 continue;
322
323                         r = parse_address_key(&p, "host", &host);
324                         if (r < 0)
325                                 return r;
326                         else if (r > 0)
327                                 continue;
328
329                         r = parse_address_key(&p, "port", &port);
330                         if (r < 0)
331                                 return r;
332                         else if (r > 0)
333                                 continue;
334
335                         r = parse_address_key(&p, "family", &family);
336                         if (r < 0)
337                                 return r;
338                         else if (r > 0)
339                                 continue;
340
341                         skip_address_key(&p);
342                 }
343
344                 if (!host || !port)
345                         return -EINVAL;
346
347                 zero(hints);
348                 hints.ai_socktype = SOCK_STREAM;
349                 hints.ai_flags = AI_ADDRCONFIG;
350
351                 if (family) {
352                         if (streq(family, "ipv4"))
353                                 hints.ai_family = AF_INET;
354                         else if (streq(family, "ipv6"))
355                                 hints.ai_family = AF_INET6;
356                         else
357                                 return -EINVAL;
358                 }
359
360                 r = getaddrinfo(host, port, &hints, &result);
361                 if (r == EAI_SYSTEM)
362                         return -errno;
363                 else if (r != 0)
364                         return -EADDRNOTAVAIL;
365
366                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
367                 b->sockaddr_size = result->ai_addrlen;
368
369                 freeaddrinfo(result);
370         }
371
372         if (guid) {
373                 r = sd_id128_from_string(guid, &b->peer);
374                 if (r < 0)
375                         return r;
376         }
377
378         b->address_index = p - b->address;
379         return 1;
380 }
381
382 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
383
384         while (size > 0) {
385                 struct iovec *i = iov + *idx;
386
387                 if (i->iov_len > size) {
388                         i->iov_base = (uint8_t*) i->iov_base + size;
389                         i->iov_len -= size;
390                         return;
391                 }
392
393                 size -= i->iov_len;
394
395                 i->iov_base = NULL;
396                 i->iov_len = 0;
397
398                 (*idx) ++;
399         }
400 }
401
402 static int bus_write_auth(sd_bus *b) {
403         struct msghdr mh;
404         ssize_t k;
405
406         assert(b);
407         assert(b->state == BUS_AUTHENTICATING);
408
409         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
410                 return 0;
411
412         if (b->auth_timeout == 0)
413                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
414
415         zero(mh);
416         mh.msg_iov = b->auth_iovec + b->auth_index;
417         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
418
419         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
420         if (k < 0)
421                 return errno == EAGAIN ? 0 : -errno;
422
423         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
424
425         return 1;
426 }
427
428 static int bus_auth_verify(sd_bus *b) {
429         char *e, *f;
430         sd_id128_t peer;
431         unsigned i;
432         int r;
433
434         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
435          * that's it */
436
437         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
438         if (!e)
439                 return 0;
440
441         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
442         if (!f)
443                 return 0;
444
445         if (e - (char*) b->rbuffer != 3 + 32)
446                 return -EPERM;
447
448         if (memcmp(b->rbuffer, "OK ", 3))
449                 return -EPERM;
450
451         for (i = 0; i < 32; i += 2) {
452                 int x, y;
453
454                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
455                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
456
457                 if (x < 0 || y < 0)
458                         return -EINVAL;
459
460                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
461         }
462
463         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
464             !sd_id128_equal(b->peer, peer))
465                 return -EPERM;
466
467         b->peer = peer;
468
469         b->can_fds =
470                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
471                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
472
473         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
474         memmove(b->rbuffer, f + 2, b->rbuffer_size);
475
476         r = bus_start_running(b);
477         if (r < 0)
478                 return r;
479
480         return 1;
481 }
482
483 static int bus_read_auth(sd_bus *b) {
484         struct msghdr mh;
485         struct iovec iov;
486         size_t n;
487         ssize_t k;
488         int r;
489         void *p;
490
491         assert(b);
492
493         r = bus_auth_verify(b);
494         if (r != 0)
495                 return r;
496
497         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
498         p = realloc(b->rbuffer, n);
499         if (!p)
500                 return -ENOMEM;
501
502         b->rbuffer = p;
503
504         zero(iov);
505         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
506         iov.iov_len = n - b->rbuffer_size;
507
508         zero(mh);
509         mh.msg_iov = &iov;
510         mh.msg_iovlen = 1;
511
512         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
513         if (k < 0)
514                 return errno == EAGAIN ? 0 : -errno;
515
516         b->rbuffer_size += k;
517
518         r = bus_auth_verify(b);
519         if (r != 0)
520                 return r;
521
522         return 1;
523 }
524
525 static int bus_start_auth(sd_bus *b) {
526         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
527         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
528
529         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
530         size_t l;
531
532         assert(b);
533
534         b->state = BUS_AUTHENTICATING;
535
536         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
537         char_array_0(text);
538
539         l = strlen(text);
540         b->auth_uid = hexmem(text, l);
541         if (!b->auth_uid)
542                 return -ENOMEM;
543
544         b->auth_iovec[0].iov_base = (void*) auth_prefix;
545         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
546         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
547         b->auth_iovec[1].iov_len = l * 2;
548         b->auth_iovec[2].iov_base = (void*) auth_suffix;
549         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
550         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
551
552         return bus_write_auth(b);
553 }
554
555 static int bus_start_connect(sd_bus *b) {
556         int r;
557
558         assert(b);
559         assert(b->fd < 0);
560
561         for (;;) {
562                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
563                         r = bus_parse_next_address(b);
564                         if (r < 0)
565                                 return r;
566                         if (r == 0)
567                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
568                 }
569
570                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
571                 if (b->fd < 0) {
572                         b->last_connect_error = errno;
573                         zero(b->sockaddr);
574                         continue;
575                 }
576
577                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
578                 if (r < 0) {
579                         if (errno == EINPROGRESS)
580                                 return 1;
581
582                         b->last_connect_error = errno;
583                         close_nointr_nofail(b->fd);
584                         b->fd = -1;
585                         zero(b->sockaddr);
586                         continue;
587                 }
588
589                 return bus_start_auth(b);
590         }
591 }
592
593 int sd_bus_open_system(sd_bus **ret) {
594         const char *e;
595         sd_bus *b;
596         int r;
597
598         if (!ret)
599                 return -EINVAL;
600
601         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
602         if (e) {
603                 r = sd_bus_open_address(e, &b);
604                 if (r < 0)
605                         return r;
606         } else {
607                 b = bus_new();
608                 if (!b)
609                         return -ENOMEM;
610
611                 b->sockaddr.un.sun_family = AF_UNIX;
612                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
613                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
614
615                 r = bus_start_connect(b);
616                 if (r < 0) {
617                         bus_free(b);
618                         return r;
619                 }
620         }
621
622         r = bus_send_hello(b);
623         if (r < 0) {
624                 sd_bus_unref(b);
625                 return r;
626         }
627
628         *ret = b;
629         return 0;
630 }
631
632 int sd_bus_open_user(sd_bus **ret) {
633         const char *e;
634         sd_bus *b;
635         size_t l;
636         int r;
637
638         if (!ret)
639                 return -EINVAL;
640
641         e = getenv("DBUS_SESSION_BUS_ADDRESS");
642         if (e) {
643                 r = sd_bus_open_address(e, &b);
644                 if (r < 0)
645                         return r;
646         } else {
647                 e = getenv("XDG_RUNTIME_DIR");
648                 if (!e)
649                         return -ENOENT;
650
651                 l = strlen(e);
652                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
653                         return -E2BIG;
654
655                 b = bus_new();
656                 if (!b)
657                         return -ENOMEM;
658
659                 b->sockaddr.un.sun_family = AF_UNIX;
660                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
661                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
662
663                 r = bus_start_connect(b);
664                 if (r < 0) {
665                         bus_free(b);
666                         return r;
667                 }
668         }
669
670         r = bus_send_hello(b);
671         if (r < 0) {
672                 sd_bus_unref(b);
673                 return r;
674         }
675
676         *ret = b;
677         return 0;
678 }
679
680 int sd_bus_open_address(const char *address, sd_bus **ret) {
681         sd_bus *b;
682         int r;
683
684         if (!address)
685                 return -EINVAL;
686         if (!ret)
687                 return -EINVAL;
688
689         b = bus_new();
690         if (!b)
691                 return -ENOMEM;
692
693         b->address = strdup(address);
694         if (!b->address) {
695                 bus_free(b);
696                 return -ENOMEM;
697         }
698
699         r = bus_start_connect(b);
700         if (r < 0) {
701                 bus_free(b);
702                 return r;
703         }
704
705         *ret = b;
706         return 0;
707 }
708
709 int sd_bus_open_fd(int fd, sd_bus **ret) {
710         sd_bus *b;
711         int r;
712
713         if (fd < 0)
714                 return -EINVAL;
715         if (!ret)
716                 return -EINVAL;
717
718         b = bus_new();
719         if (!b)
720                 return -ENOMEM;
721
722         b->fd = fd;
723         fd_nonblock(b->fd, true);
724         fd_cloexec(b->fd, true);
725
726         r = bus_start_auth(b);
727         if (r < 0) {
728                 bus_free(b);
729                 return r;
730         }
731
732         *ret = b;
733         return 0;
734 }
735
736 void sd_bus_close(sd_bus *bus) {
737         if (!bus)
738                 return;
739         if (bus->fd < 0)
740                 return;
741
742         close_nointr_nofail(bus->fd);
743         bus->fd = -1;
744 }
745
746 sd_bus *sd_bus_ref(sd_bus *bus) {
747         if (!bus)
748                 return NULL;
749
750         assert(bus->n_ref > 0);
751
752         bus->n_ref++;
753         return bus;
754 }
755
756 sd_bus *sd_bus_unref(sd_bus *bus) {
757         if (!bus)
758                 return NULL;
759
760         assert(bus->n_ref > 0);
761         bus->n_ref--;
762
763         if (bus->n_ref <= 0)
764                 bus_free(bus);
765
766         return NULL;
767 }
768
769 int sd_bus_is_open(sd_bus *bus) {
770         if (!bus)
771                 return -EINVAL;
772
773         return bus->fd >= 0;
774 }
775
776 int sd_bus_is_running(sd_bus *bus) {
777         if (!bus)
778                 return -EINVAL;
779
780         if (bus->fd < 0)
781                 return -ENOTCONN;
782
783         return bus->state == BUS_RUNNING;
784 }
785
786 int sd_bus_can_send(sd_bus *bus, char type) {
787
788         if (!bus)
789                 return -EINVAL;
790         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
791                 return -EAGAIN;
792
793         if (type == SD_BUS_TYPE_UNIX_FD)
794                 return bus->can_fds;
795
796         return bus_type_is_valid(type);
797 }
798
799 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
800         assert(m);
801
802         if (m->header->version > b->message_version)
803                 return -EPERM;
804
805         if (m->sealed)
806                 return 0;
807
808         return bus_message_seal(m, ++b->serial);
809 }
810
811 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
812         struct msghdr mh;
813         struct iovec *iov;
814         ssize_t k;
815         size_t n;
816         unsigned j;
817
818         assert(bus);
819         assert(m);
820         assert(idx);
821         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
822
823         if (*idx >= m->size)
824                 return 0;
825
826         n = m->n_iovec * sizeof(struct iovec);
827         iov = alloca(n);
828         memcpy(iov, m->iovec, n);
829
830         j = 0;
831         iovec_advance(iov, &j, *idx);
832
833         zero(mh);
834         mh.msg_iov = iov;
835         mh.msg_iovlen = m->n_iovec;
836
837         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
838         if (k < 0)
839                 return errno == EAGAIN ? 0 : -errno;
840
841         *idx += (size_t) k;
842         return 1;
843 }
844
845 static int message_read_need(sd_bus *bus, size_t *need) {
846         uint32_t a, b;
847         uint8_t e;
848
849         assert(bus);
850         assert(need);
851         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
852
853         if (bus->rbuffer_size < sizeof(struct bus_header)) {
854                 *need = sizeof(struct bus_header) + 8;
855
856                 /* Minimum message size:
857                  *
858                  * Header +
859                  *
860                  *  Method Call: +2 string headers
861                  *       Signal: +3 string headers
862                  * Method Error: +1 string headers
863                  *               +1 uint32 headers
864                  * Method Reply: +1 uint32 headers
865                  *
866                  * A string header is at least 9 bytes
867                  * A uint32 header is at least 8 bytes
868                  *
869                  * Hence the minimum message size of a valid message
870                  * is header + 8 bytes */
871
872                 return 0;
873         }
874
875         a = ((const uint32_t*) bus->rbuffer)[1];
876         b = ((const uint32_t*) bus->rbuffer)[3];
877
878         e = ((const uint8_t*) bus->rbuffer)[0];
879         if (e == SD_BUS_LITTLE_ENDIAN) {
880                 a = le32toh(a);
881                 b = le32toh(b);
882         } else if (e == SD_BUS_BIG_ENDIAN) {
883                 a = be32toh(a);
884                 b = be32toh(b);
885         } else
886                 return -EBADMSG;
887
888         *need = sizeof(struct bus_header) + ALIGN_TO(b, 8) + a;
889         return 0;
890 }
891
892 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
893         sd_bus_message *t;
894         void *b = NULL;
895         int r;
896
897         assert(bus);
898         assert(m);
899         assert(bus->rbuffer_size >= size);
900         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
901
902         if (bus->rbuffer_size > size) {
903                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
904                 if (!b) {
905                         free(t);
906                         return -ENOMEM;
907                 }
908         }
909
910         r = bus_message_from_malloc(bus->rbuffer, size, &t);
911         if (r < 0) {
912                 free(b);
913                 return r;
914         }
915
916         bus->rbuffer = b;
917         bus->rbuffer_size -= size;
918
919         *m = t;
920         return 1;
921 }
922
923 static int message_read(sd_bus *bus, sd_bus_message **m) {
924         struct msghdr mh;
925         struct iovec iov;
926         ssize_t k;
927         size_t need;
928         int r;
929         void *b;
930
931         assert(bus);
932         assert(m);
933         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
934
935         r = message_read_need(bus, &need);
936         if (r < 0)
937                 return r;
938
939         if (bus->rbuffer_size >= need)
940                 return message_make(bus, need, m);
941
942         b = realloc(bus->rbuffer, need);
943         if (!b)
944                 return -ENOMEM;
945
946         bus->rbuffer = b;
947
948         zero(iov);
949         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
950         iov.iov_len = need - bus->rbuffer_size;
951
952         zero(mh);
953         mh.msg_iov = &iov;
954         mh.msg_iovlen = 1;
955
956         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
957         if (k < 0)
958                 return errno == EAGAIN ? 0 : -errno;
959
960         bus->rbuffer_size += k;
961
962         r = message_read_need(bus, &need);
963         if (r < 0)
964                 return r;
965
966         if (bus->rbuffer_size >= need)
967                 return message_make(bus, need, m);
968
969         return 1;
970 }
971
972 static int dispatch_wqueue(sd_bus *bus) {
973         int r, ret = 0;
974
975         assert(bus);
976         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
977
978         if (bus->fd < 0)
979                 return -ENOTCONN;
980
981         while (bus->wqueue_size > 0) {
982
983                 r = message_write(bus, bus->wqueue[0], &bus->windex);
984                 if (r < 0) {
985                         sd_bus_close(bus);
986                         return r;
987                 } else if (r == 0)
988                         /* Didn't do anything this time */
989                         return ret;
990                 else if (bus->windex >= bus->wqueue[0]->size) {
991                         /* Fully written. Let's drop the entry from
992                          * the queue.
993                          *
994                          * This isn't particularly optimized, but
995                          * well, this is supposed to be our worst-case
996                          * buffer only, and the socket buffer is
997                          * supposed to be our primary buffer, and if
998                          * it got full, then all bets are off
999                          * anyway. */
1000
1001                         sd_bus_message_unref(bus->wqueue[0]);
1002                         bus->wqueue_size --;
1003                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1004                         bus->windex = 0;
1005
1006                         ret = 1;
1007                 }
1008         }
1009
1010         return ret;
1011 }
1012
1013 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1014         sd_bus_message *z;
1015         int r, ret = 0;
1016
1017         assert(bus);
1018         assert(m);
1019         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1020
1021         if (bus->fd < 0)
1022                 return -ENOTCONN;
1023
1024         if (bus->rqueue_size > 0) {
1025                 /* Dispatch a queued message */
1026
1027                 *m = bus->rqueue[0];
1028                 bus->rqueue_size --;
1029                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1030                 return 1;
1031         }
1032
1033         /* Try to read a new message */
1034         do {
1035                 r = message_read(bus, &z);
1036                 if (r < 0) {
1037                         sd_bus_close(bus);
1038                         return r;
1039                 }
1040                 if (r == 0)
1041                         return ret;
1042
1043                 r = 1;
1044         } while (!z);
1045
1046         *m = z;
1047         return 1;
1048 }
1049
1050 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1051         int r;
1052
1053         if (!bus)
1054                 return -EINVAL;
1055         if (bus->fd < 0)
1056                 return -ENOTCONN;
1057         if (!m)
1058                 return -EINVAL;
1059
1060         r = bus_seal_message(bus, m);
1061         if (r < 0)
1062                 return r;
1063
1064         /* If this is a reply and no reply was requested, then let's
1065          * suppress this, if we can */
1066         if (m->dont_send && !serial)
1067                 return 0;
1068
1069         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1070                 size_t idx = 0;
1071
1072                 r = message_write(bus, m, &idx);
1073                 if (r < 0) {
1074                         sd_bus_close(bus);
1075                         return r;
1076                 } else if (idx < m->size)  {
1077                         /* Wasn't fully written. So let's remember how
1078                          * much was written. Note that the first entry
1079                          * of the wqueue array is always allocated so
1080                          * that we always can remember how much was
1081                          * written. */
1082                         bus->wqueue[0] = sd_bus_message_ref(m);
1083                         bus->wqueue_size = 1;
1084                         bus->windex = idx;
1085                 }
1086         } else {
1087                 sd_bus_message **q;
1088
1089                 /* Just append it to the queue. */
1090
1091                 if (bus->wqueue_size >= WQUEUE_MAX)
1092                         return -ENOBUFS;
1093
1094                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1095                 if (!q)
1096                         return -ENOMEM;
1097
1098                 bus->wqueue = q;
1099                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1100         }
1101
1102         if (serial)
1103                 *serial = BUS_MESSAGE_SERIAL(m);
1104
1105         return 0;
1106 }
1107
1108 static usec_t calc_elapse(uint64_t usec) {
1109         if (usec == (uint64_t) -1)
1110                 return 0;
1111
1112         if (usec == 0)
1113                 usec = BUS_DEFAULT_TIMEOUT;
1114
1115         return now(CLOCK_MONOTONIC) + usec;
1116 }
1117
1118 static int timeout_compare(const void *a, const void *b) {
1119         const struct reply_callback *x = a, *y = b;
1120
1121         if (x->timeout != 0 && y->timeout == 0)
1122                 return -1;
1123
1124         if (x->timeout == 0 && y->timeout != 0)
1125                 return 1;
1126
1127         if (x->timeout < y->timeout)
1128                 return -1;
1129
1130         if (x->timeout > y->timeout)
1131                 return 1;
1132
1133         return 0;
1134 }
1135
1136 int sd_bus_send_with_reply(
1137                 sd_bus *bus,
1138                 sd_bus_message *m,
1139                 sd_message_handler_t callback,
1140                 void *userdata,
1141                 uint64_t usec,
1142                 uint64_t *serial) {
1143
1144         struct reply_callback *c;
1145         int r;
1146
1147         if (!bus)
1148                 return -EINVAL;
1149         if (bus->fd < 0)
1150                 return -ENOTCONN;
1151         if (!m)
1152                 return -EINVAL;
1153         if (!callback)
1154                 return -EINVAL;
1155         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1156                 return -EINVAL;
1157         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1158                 return -EINVAL;
1159
1160         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1161         if (r < 0)
1162                 return r;
1163
1164         if (usec != (uint64_t) -1) {
1165                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1166                 if (r < 0)
1167                         return r;
1168         }
1169
1170         r = bus_seal_message(bus, m);
1171         if (r < 0)
1172                 return r;
1173
1174         c = new(struct reply_callback, 1);
1175         if (!c)
1176                 return -ENOMEM;
1177
1178         c->callback = callback;
1179         c->userdata = userdata;
1180         c->serial = BUS_MESSAGE_SERIAL(m);
1181         c->timeout = calc_elapse(usec);
1182
1183         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1184         if (r < 0) {
1185                 free(c);
1186                 return r;
1187         }
1188
1189         if (c->timeout != 0) {
1190                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1191                 if (r < 0) {
1192                         c->timeout = 0;
1193                         sd_bus_send_with_reply_cancel(bus, c->serial);
1194                         return r;
1195                 }
1196         }
1197
1198         r = sd_bus_send(bus, m, serial);
1199         if (r < 0) {
1200                 sd_bus_send_with_reply_cancel(bus, c->serial);
1201                 return r;
1202         }
1203
1204         return r;
1205 }
1206
1207 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1208         struct reply_callback *c;
1209
1210         if (!bus)
1211                 return -EINVAL;
1212         if (serial == 0)
1213                 return -EINVAL;
1214
1215         c = hashmap_remove(bus->reply_callbacks, &serial);
1216         if (!c)
1217                 return 0;
1218
1219         if (c->timeout != 0)
1220                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1221
1222         free(c);
1223         return 1;
1224 }
1225
1226 static int ensure_running(sd_bus *bus) {
1227         int r;
1228
1229         assert(bus);
1230
1231         r = sd_bus_is_running(bus);
1232         if (r != 0)
1233                 return r;
1234
1235         for (;;) {
1236                 int k;
1237
1238                 r = sd_bus_process(bus, NULL);
1239
1240                 if (r < 0)
1241                         return r;
1242
1243                 k = sd_bus_is_running(bus);
1244                 if (k != 0)
1245                         return k;
1246
1247                 if (r > 0)
1248                         continue;
1249
1250                 r = sd_bus_wait(bus, (uint64_t) -1);
1251                 if (r < 0)
1252                         return r;
1253         }
1254 }
1255
1256 int sd_bus_send_with_reply_and_block(
1257                 sd_bus *bus,
1258                 sd_bus_message *m,
1259                 uint64_t usec,
1260                 sd_bus_error *error,
1261                 sd_bus_message **reply) {
1262
1263         int r;
1264         usec_t timeout;
1265         uint64_t serial;
1266         bool room = false;
1267
1268         if (!bus)
1269                 return -EINVAL;
1270         if (bus->fd < 0)
1271                 return -ENOTCONN;
1272         if (!m)
1273                 return -EINVAL;
1274         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1275                 return -EINVAL;
1276         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1277                 return -EINVAL;
1278         if (bus_error_is_dirty(error))
1279                 return -EINVAL;
1280
1281         r = ensure_running(bus);
1282         if (r < 0)
1283                 return r;
1284
1285         r = sd_bus_send(bus, m, &serial);
1286         if (r < 0)
1287                 return r;
1288
1289         timeout = calc_elapse(usec);
1290
1291         for (;;) {
1292                 usec_t left;
1293                 sd_bus_message *incoming = NULL;
1294
1295                 if (!room) {
1296                         sd_bus_message **q;
1297
1298                         /* Make sure there's room for queuing this
1299                          * locally, before we read the message */
1300
1301                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1302                         if (!q)
1303                                 return -ENOMEM;
1304
1305                         bus->rqueue = q;
1306                         room = true;
1307                 }
1308
1309                 r = message_read(bus, &incoming);
1310                 if (r < 0)
1311                         return r;
1312                 if (incoming) {
1313
1314                         if (incoming->reply_serial == serial) {
1315                                 /* Found a match! */
1316
1317                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1318                                         *reply = incoming;
1319                                         return 0;
1320                                 }
1321
1322                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1323                                         int k;
1324
1325                                         r = sd_bus_error_copy(error, &incoming->error);
1326                                         if (r < 0) {
1327                                                 sd_bus_message_unref(incoming);
1328                                                 return r;
1329                                         }
1330
1331                                         k = bus_error_to_errno(&incoming->error);
1332                                         sd_bus_message_unref(incoming);
1333                                         return k;
1334                                 }
1335
1336                                 sd_bus_message_unref(incoming);
1337                                 return -EIO;
1338                         }
1339
1340                         /* There's already guaranteed to be room for
1341                          * this, so need to resize things here */
1342                         bus->rqueue[bus->rqueue_size ++] = incoming;
1343                         room = false;
1344
1345                         /* Try to read more, right-away */
1346                         continue;
1347                 }
1348                 if (r != 0)
1349                         continue;
1350
1351                 if (timeout > 0) {
1352                         usec_t n;
1353
1354                         n = now(CLOCK_MONOTONIC);
1355                         if (n >= timeout)
1356                                 return -ETIMEDOUT;
1357
1358                         left = timeout - n;
1359                 } else
1360                         left = (uint64_t) -1;
1361
1362                 r = bus_poll(bus, true, left);
1363                 if (r < 0)
1364                         return r;
1365
1366                 r = dispatch_wqueue(bus);
1367                 if (r < 0)
1368                         return r;
1369         }
1370 }
1371
1372 int sd_bus_get_fd(sd_bus *bus) {
1373         if (!bus)
1374                 return -EINVAL;
1375
1376         if (bus->fd < 0)
1377                 return -ENOTCONN;
1378
1379         return bus->fd;
1380 }
1381
1382 int sd_bus_get_events(sd_bus *bus) {
1383         int flags = 0;
1384
1385         if (!bus)
1386                 return -EINVAL;
1387         if (bus->fd < 0)
1388                 return -ENOTCONN;
1389
1390         if (bus->state == BUS_OPENING)
1391                 flags |= POLLOUT;
1392         else if (bus->state == BUS_AUTHENTICATING) {
1393
1394                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1395                         flags |= POLLOUT;
1396
1397                 flags |= POLLIN;
1398
1399         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1400                 if (bus->rqueue_size <= 0)
1401                         flags |= POLLIN;
1402                 if (bus->wqueue_size > 0)
1403                         flags |= POLLOUT;
1404         }
1405
1406         return flags;
1407 }
1408
1409 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1410         struct reply_callback *c;
1411
1412         if (!bus)
1413                 return -EINVAL;
1414         if (!timeout_usec)
1415                 return -EINVAL;
1416         if (bus->fd < 0)
1417                 return -ENOTCONN;
1418
1419         if (bus->state == BUS_AUTHENTICATING) {
1420                 *timeout_usec = bus->auth_timeout;
1421                 return 1;
1422         }
1423
1424         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1425                 return 0;
1426
1427         c = prioq_peek(bus->reply_callbacks_prioq);
1428         if (!c)
1429                 return 0;
1430
1431         *timeout_usec = c->timeout;
1432         return 1;
1433 }
1434
1435 static int process_timeout(sd_bus *bus) {
1436         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1437         struct reply_callback *c;
1438         usec_t n;
1439         int r;
1440
1441         assert(bus);
1442
1443         c = prioq_peek(bus->reply_callbacks_prioq);
1444         if (!c)
1445                 return 0;
1446
1447         n = now(CLOCK_MONOTONIC);
1448         if (c->timeout > n)
1449                 return 0;
1450
1451         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1452         hashmap_remove(bus->reply_callbacks, &c->serial);
1453
1454         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1455         free(c);
1456
1457         return r < 0 ? r : 1;
1458 }
1459
1460 static int process_message(sd_bus *bus, sd_bus_message *m) {
1461         struct filter_callback *l;
1462         int r;
1463
1464         assert(bus);
1465         assert(m);
1466
1467         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1468                 struct reply_callback *c;
1469
1470                 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1471                 if (c) {
1472                         if (c->timeout != 0)
1473                                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1474
1475                         r = c->callback(bus, 0, m, c->userdata);
1476                         free(c);
1477
1478                         if (r != 0)
1479                                 return r;
1480                 }
1481         }
1482
1483         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1484                 r = l->callback(bus, 0, m, l->userdata);
1485                 if (r != 0)
1486                         return r;
1487         }
1488
1489         return 0;
1490 }
1491
1492 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1493         int r;
1494
1495         /* Returns 0 when we didn't do anything. This should cause the
1496          * caller to invoke sd_bus_wait() before returning the next
1497          * time. Returns > 0 when we did something, which possibly
1498          * means *ret is filled in with an unprocessed message. */
1499
1500         if (!bus)
1501                 return -EINVAL;
1502         if (bus->fd < 0)
1503                 return -ENOTCONN;
1504
1505         if (bus->state == BUS_OPENING) {
1506                 struct pollfd p;
1507
1508                 zero(p);
1509                 p.fd = bus->fd;
1510                 p.events = POLLOUT;
1511
1512                 r = poll(&p, 1, 0);
1513                 if (r < 0)
1514                         return -errno;
1515
1516                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1517                         int error = 0;
1518                         socklen_t slen = sizeof(error);
1519
1520                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1521                         if (r < 0)
1522                                 bus->last_connect_error = errno;
1523                         else if (error != 0)
1524                                 bus->last_connect_error = error;
1525                         else if (p.revents & (POLLERR|POLLHUP))
1526                                 bus->last_connect_error = ECONNREFUSED;
1527                         else {
1528                                 r = bus_start_auth(bus);
1529                                 goto null_message;
1530                         }
1531
1532                         /* Try next address */
1533                         r = bus_start_connect(bus);
1534                         goto null_message;
1535                 }
1536
1537                 r = 0;
1538                 goto null_message;
1539
1540         } else if (bus->state == BUS_AUTHENTICATING) {
1541
1542                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1543                         return -ETIMEDOUT;
1544
1545                 r = bus_write_auth(bus);
1546                 if (r != 0)
1547                         goto null_message;
1548
1549                 r = bus_read_auth(bus);
1550                 goto null_message;
1551
1552         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1553                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1554                 int k;
1555
1556                 r = process_timeout(bus);
1557                 if (r != 0)
1558                         goto null_message;
1559
1560                 r = dispatch_wqueue(bus);
1561                 if (r != 0)
1562                         goto null_message;
1563
1564                 k = r;
1565                 r = dispatch_rqueue(bus, &m);
1566                 if (r < 0)
1567                         return r;
1568                 if (!m) {
1569                         if (r == 0)
1570                                 r = k;
1571                         goto null_message;
1572                 }
1573
1574                 r = process_message(bus, m);
1575                 if (r != 0)
1576                         goto null_message;
1577
1578                 if (ret) {
1579                         *ret = m;
1580                         m = NULL;
1581                         return 1;
1582                 }
1583
1584                 if (sd_bus_message_is_method_call(m, NULL, NULL)) {
1585                         const sd_bus_error e = SD_BUS_ERROR_INIT_CONST("org.freedesktop.DBus.Error.UnknownObject", "Unknown object.");
1586                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1587
1588                         r = sd_bus_message_new_method_error(bus, m, &e, &reply);
1589                         if (r < 0)
1590                                 return r;
1591
1592                         r = sd_bus_send(bus, reply, NULL);
1593                         if (r < 0)
1594                                 return r;
1595                 }
1596
1597                 return 1;
1598         }
1599
1600         assert_not_reached("Unknown state");
1601
1602 null_message:
1603         if (r >= 0 && ret)
1604                 *ret = NULL;
1605
1606         return r;
1607 }
1608
1609 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1610         struct pollfd p;
1611         int r, e;
1612         struct timespec ts;
1613         usec_t until, m;
1614
1615         assert(bus);
1616
1617         if (bus->fd < 0)
1618                 return -ENOTCONN;
1619
1620         e = sd_bus_get_events(bus);
1621         if (e < 0)
1622                 return e;
1623
1624         if (need_more)
1625                 e |= POLLIN;
1626
1627         r = sd_bus_get_timeout(bus, &until);
1628         if (r < 0)
1629                 return r;
1630         if (r == 0)
1631                 m = (uint64_t) -1;
1632         else {
1633                 usec_t n;
1634                 n = now(CLOCK_MONOTONIC);
1635                 m = until > n ? until - n : 0;
1636         }
1637
1638         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1639                 m = timeout_usec;
1640
1641         zero(p);
1642         p.fd = bus->fd;
1643         p.events = e;
1644
1645         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1646         if (r < 0)
1647                 return -errno;
1648
1649         return r > 0 ? 1 : 0;
1650 }
1651
1652 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1653
1654         if (!bus)
1655                 return -EINVAL;
1656         if (bus->fd < 0)
1657                 return -ENOTCONN;
1658         if (bus->rqueue_size > 0)
1659                 return 0;
1660
1661         return bus_poll(bus, false, timeout_usec);
1662 }
1663
1664 int sd_bus_flush(sd_bus *bus) {
1665         int r;
1666
1667         if (!bus)
1668                 return -EINVAL;
1669         if (bus->fd < 0)
1670                 return -ENOTCONN;
1671
1672         r = ensure_running(bus);
1673         if (r < 0)
1674                 return r;
1675
1676         if (bus->wqueue_size <= 0)
1677                 return 0;
1678
1679         for (;;) {
1680                 r = dispatch_wqueue(bus);
1681                 if (r < 0)
1682                         return r;
1683
1684                 if (bus->wqueue_size <= 0)
1685                         return 0;
1686
1687                 r = bus_poll(bus, false, (uint64_t) -1);
1688                 if (r < 0)
1689                         return r;
1690         }
1691 }
1692
1693 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1694         struct filter_callback *f;
1695
1696         if (!bus)
1697                 return -EINVAL;
1698         if (!callback)
1699                 return -EINVAL;
1700
1701         f = new(struct filter_callback, 1);
1702         if (!f)
1703                 return -ENOMEM;
1704         f->callback = callback;
1705         f->userdata = userdata;
1706
1707         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1708         return 0;
1709 }
1710
1711 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1712         struct filter_callback *f;
1713
1714         if (!bus)
1715                 return -EINVAL;
1716         if (!callback)
1717                 return -EINVAL;
1718
1719         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1720                 if (f->callback == callback && f->userdata == userdata) {
1721                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1722                         free(f);
1723                         return 1;
1724                 }
1725         }
1726
1727         return 0;
1728 }