chiark / gitweb /
ad840cc875c6e4bf87ab072435f8b620a58527b1
[elogind.git] / src / libsystemd-bus / sd-bus.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <endian.h>
23 #include <assert.h>
24 #include <stdlib.h>
25 #include <unistd.h>
26 #include <netdb.h>
27 #include <sys/poll.h>
28 #include <byteswap.h>
29
30 #include "util.h"
31 #include "macro.h"
32
33 #include "sd-bus.h"
34 #include "bus-internal.h"
35 #include "bus-message.h"
36 #include "bus-type.h"
37
38 #define WQUEUE_MAX 128
39
40 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
41
42 static void bus_free(sd_bus *b) {
43         struct filter_callback *f;
44         unsigned i;
45
46         assert(b);
47
48         if (b->fd >= 0)
49                 close_nointr_nofail(b->fd);
50
51         free(b->rbuffer);
52         free(b->unique_name);
53         free(b->auth_uid);
54         free(b->address);
55
56         for (i = 0; i < b->rqueue_size; i++)
57                 sd_bus_message_unref(b->rqueue[i]);
58         free(b->rqueue);
59
60         for (i = 0; i < b->wqueue_size; i++)
61                 sd_bus_message_unref(b->wqueue[i]);
62         free(b->wqueue);
63
64         hashmap_free_free(b->reply_callbacks);
65         prioq_free(b->reply_callbacks_prioq);
66
67         while ((f = b->filter_callbacks)) {
68                 LIST_REMOVE(struct filter_callback, callbacks, b->filter_callbacks, f);
69                 free(f);
70         }
71
72         free(b);
73 }
74
75 static sd_bus* bus_new(void) {
76         sd_bus *r;
77
78         r = new0(sd_bus, 1);
79         if (!r)
80                 return NULL;
81
82         r->n_ref = 1;
83         r->fd = -1;
84         r->message_version = 1;
85
86         /* We guarantee that wqueue always has space for at least one
87          * entry */
88         r->wqueue = new(sd_bus_message*, 1);
89         if (!r->wqueue) {
90                 free(r);
91                 return NULL;
92         }
93
94         return r;
95 };
96
97 static int hello_callback(sd_bus *bus, int error, sd_bus_message *reply, void *userdata) {
98         const char *s;
99         int r;
100
101         assert(bus);
102
103         if (error != 0)
104                 return -error;
105
106         assert(reply);
107
108         bus->state = BUS_RUNNING;
109
110         r = sd_bus_message_read(reply, "s", &s);
111         if (r < 0)
112                 return r;
113
114         bus->unique_name = strdup(s);
115         if (!bus->unique_name)
116                 return -ENOMEM;
117
118         return 1;
119 }
120
121 static int bus_send_hello(sd_bus *bus) {
122         _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
123         int r;
124
125         assert(bus);
126
127         r = sd_bus_message_new_method_call(
128                         bus,
129                         "org.freedesktop.DBus",
130                         "/",
131                         "org.freedesktop.DBus",
132                         "Hello",
133                         &m);
134         if (r < 0)
135                 return r;
136
137         r = sd_bus_send_with_reply(bus, m, hello_callback, NULL, 0, NULL);
138         if (r < 0)
139                 return r;
140
141         bus->sent_hello = true;
142         return r;
143 }
144
145 static int bus_start_running(sd_bus *bus) {
146         assert(bus);
147
148         if (bus->sent_hello) {
149                 bus->state = BUS_HELLO;
150                 return 1;
151         }
152
153         bus->state = BUS_RUNNING;
154         return 1;
155 }
156
157 static int parse_address_key(const char **p, const char *key, char **value) {
158         size_t l, n = 0;
159         const char *a;
160         char *r = NULL;
161
162         assert(p);
163         assert(*p);
164         assert(key);
165         assert(value);
166
167         l = strlen(key);
168         if (strncmp(*p, key, l) != 0)
169                 return 0;
170
171         if ((*p)[l] != '=')
172                 return 0;
173
174         if (*value)
175                 return -EINVAL;
176
177         a = *p + l + 1;
178         while (*a != ',' && *a != 0) {
179                 char c, *t;
180
181                 if (*a == '%') {
182                         int x, y;
183
184                         x = unhexchar(a[1]);
185                         if (x < 0) {
186                                 free(r);
187                                 return x;
188                         }
189
190                         y = unhexchar(a[2]);
191                         if (y < 0) {
192                                 free(r);
193                                 return y;
194                         }
195
196                         c = (char) ((x << 4) | y);
197                         a += 3;
198                 } else {
199                         c = *a;
200                         a++;
201                 }
202
203                 t = realloc(r, n + 2);
204                 if (!t) {
205                         free(r);
206                         return -ENOMEM;
207                 }
208
209                 r = t;
210                 r[n++] = c;
211         }
212
213         if (!r) {
214                 r = strdup("");
215                 if (!r)
216                         return -ENOMEM;
217         } else
218                 r[n] = 0;
219
220         if (*a == ',')
221                 a++;
222
223         *p = a;
224         *value = r;
225         return 1;
226 }
227
228 static void skip_address_key(const char **p) {
229         assert(p);
230         assert(*p);
231
232         *p += strcspn(*p, ",");
233
234         if (**p == ',')
235                 (*p) ++;
236 }
237
238 static int bus_parse_next_address(sd_bus *b) {
239         const char *a, *p;
240         _cleanup_free_ char *guid = NULL;
241         int r;
242
243         assert(b);
244
245         if (!b->address)
246                 return 0;
247         if (b->address[b->address_index] == 0)
248                 return 0;
249
250         a = b->address + b->address_index;
251
252         zero(b->sockaddr);
253         b->sockaddr_size = 0;
254         b->peer = SD_ID128_NULL;
255
256         if (startswith(a, "unix:")) {
257                 _cleanup_free_ char *path = NULL, *abstract = NULL;
258
259                 p = a + 5;
260                 while (*p != 0) {
261                         r = parse_address_key(&p, "guid", &guid);
262                         if (r < 0)
263                                 return r;
264                         else if (r > 0)
265                                 continue;
266
267                         r = parse_address_key(&p, "path", &path);
268                         if (r < 0)
269                                 return r;
270                         else if (r > 0)
271                                 continue;
272
273                         r = parse_address_key(&p, "abstract", &abstract);
274                         if (r < 0)
275                                 return r;
276                         else if (r > 0)
277                                 continue;
278
279                         skip_address_key(&p);
280                 }
281
282                 if (!path && !abstract)
283                         return -EINVAL;
284
285                 if (path && abstract)
286                         return -EINVAL;
287
288                 if (path) {
289                         size_t l;
290
291                         l = strlen(path);
292                         if (l > sizeof(b->sockaddr.un.sun_path))
293                                 return -E2BIG;
294
295                         b->sockaddr.un.sun_family = AF_UNIX;
296                         strncpy(b->sockaddr.un.sun_path, path, sizeof(b->sockaddr.un.sun_path));
297                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l;
298                 } else if (abstract) {
299                         size_t l;
300
301                         l = strlen(abstract);
302                         if (l > sizeof(b->sockaddr.un.sun_path) - 1)
303                                 return -E2BIG;
304
305                         b->sockaddr.un.sun_family = AF_UNIX;
306                         b->sockaddr.un.sun_path[0] = 0;
307                         strncpy(b->sockaddr.un.sun_path+1, abstract, sizeof(b->sockaddr.un.sun_path)-1);
308                         b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + 1 + l;
309                 }
310
311         } else if (startswith(a, "tcp:")) {
312                 _cleanup_free_ char *host = NULL, *port = NULL, *family = NULL;
313                 struct addrinfo hints, *result;
314
315                 p = a + 4;
316                 while (*p != 0) {
317                         r = parse_address_key(&p, "guid", &guid);
318                         if (r < 0)
319                                 return r;
320                         else if (r > 0)
321                                 continue;
322
323                         r = parse_address_key(&p, "host", &host);
324                         if (r < 0)
325                                 return r;
326                         else if (r > 0)
327                                 continue;
328
329                         r = parse_address_key(&p, "port", &port);
330                         if (r < 0)
331                                 return r;
332                         else if (r > 0)
333                                 continue;
334
335                         r = parse_address_key(&p, "family", &family);
336                         if (r < 0)
337                                 return r;
338                         else if (r > 0)
339                                 continue;
340
341                         skip_address_key(&p);
342                 }
343
344                 if (!host || !port)
345                         return -EINVAL;
346
347                 zero(hints);
348                 hints.ai_socktype = SOCK_STREAM;
349                 hints.ai_flags = AI_ADDRCONFIG;
350
351                 if (family) {
352                         if (streq(family, "ipv4"))
353                                 hints.ai_family = AF_INET;
354                         else if (streq(family, "ipv6"))
355                                 hints.ai_family = AF_INET6;
356                         else
357                                 return -EINVAL;
358                 }
359
360                 r = getaddrinfo(host, port, &hints, &result);
361                 if (r == EAI_SYSTEM)
362                         return -errno;
363                 else if (r != 0)
364                         return -EADDRNOTAVAIL;
365
366                 memcpy(&b->sockaddr, result->ai_addr, result->ai_addrlen);
367                 b->sockaddr_size = result->ai_addrlen;
368
369                 freeaddrinfo(result);
370         }
371
372         if (guid) {
373                 r = sd_id128_from_string(guid, &b->peer);
374                 if (r < 0)
375                         return r;
376         }
377
378         b->address_index = p - b->address;
379         return 1;
380 }
381
382 static void iovec_advance(struct iovec *iov, unsigned *idx, size_t size) {
383
384         while (size > 0) {
385                 struct iovec *i = iov + *idx;
386
387                 if (i->iov_len > size) {
388                         i->iov_base = (uint8_t*) i->iov_base + size;
389                         i->iov_len -= size;
390                         return;
391                 }
392
393                 size -= i->iov_len;
394
395                 i->iov_base = NULL;
396                 i->iov_len = 0;
397
398                 (*idx) ++;
399         }
400 }
401
402 static int bus_write_auth(sd_bus *b) {
403         struct msghdr mh;
404         ssize_t k;
405
406         assert(b);
407         assert(b->state == BUS_AUTHENTICATING);
408
409         if (b->auth_index >= ELEMENTSOF(b->auth_iovec))
410                 return 0;
411
412         if (b->auth_timeout == 0)
413                 b->auth_timeout = now(CLOCK_MONOTONIC) + BUS_DEFAULT_TIMEOUT;
414
415         zero(mh);
416         mh.msg_iov = b->auth_iovec + b->auth_index;
417         mh.msg_iovlen = ELEMENTSOF(b->auth_iovec) - b->auth_index;
418
419         k = sendmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
420         if (k < 0)
421                 return errno == EAGAIN ? 0 : -errno;
422
423         iovec_advance(b->auth_iovec, &b->auth_index, (size_t) k);
424
425         return 1;
426 }
427
428 static int bus_auth_verify(sd_bus *b) {
429         char *e, *f;
430         sd_id128_t peer;
431         unsigned i;
432         int r;
433
434         /* We expect two response lines: "OK", "AGREE_UNIX_FD", and
435          * that's it */
436
437         e = memmem(b->rbuffer, b->rbuffer_size, "\r\n", 2);
438         if (!e)
439                 return 0;
440
441         f = memmem(e + 2, b->rbuffer_size - (e - (char*) b->rbuffer) - 2, "\r\n", 2);
442         if (!f)
443                 return 0;
444
445         if (e - (char*) b->rbuffer != 3 + 32)
446                 return -EPERM;
447
448         if (memcmp(b->rbuffer, "OK ", 3))
449                 return -EPERM;
450
451         for (i = 0; i < 32; i += 2) {
452                 int x, y;
453
454                 x = unhexchar(((char*) b->rbuffer)[3 + i]);
455                 y = unhexchar(((char*) b->rbuffer)[3 + i + 1]);
456
457                 if (x < 0 || y < 0)
458                         return -EINVAL;
459
460                 peer.bytes[i/2] = ((uint8_t) x << 4 | (uint8_t) y);
461         }
462
463         if (!sd_id128_equal(b->peer, SD_ID128_NULL) &&
464             !sd_id128_equal(b->peer, peer))
465                 return -EPERM;
466
467         b->peer = peer;
468
469         b->can_fds =
470                 (f - e == sizeof("\r\nAGREE_UNIX_FD") - 1) &&
471                 memcmp(e + 2, "AGREE_UNIX_FD", sizeof("AGREE_UNIX_FD") - 1) == 0;
472
473         b->rbuffer_size -= (f + 2 - (char*) b->rbuffer);
474         memmove(b->rbuffer, f + 2, b->rbuffer_size);
475
476         r = bus_start_running(b);
477         if (r < 0)
478                 return r;
479
480         return 1;
481 }
482
483 static int bus_read_auth(sd_bus *b) {
484         struct msghdr mh;
485         struct iovec iov;
486         size_t n;
487         ssize_t k;
488         int r;
489         void *p;
490
491         assert(b);
492
493         r = bus_auth_verify(b);
494         if (r != 0)
495                 return r;
496
497         n = MAX(3 + 32 + 2 + sizeof("AGREE_UNIX_FD") - 1 + 2, b->rbuffer_size * 2);
498         p = realloc(b->rbuffer, n);
499         if (!p)
500                 return -ENOMEM;
501
502         b->rbuffer = p;
503
504         zero(iov);
505         iov.iov_base = (uint8_t*) b->rbuffer + b->rbuffer_size;
506         iov.iov_len = n - b->rbuffer_size;
507
508         zero(mh);
509         mh.msg_iov = &iov;
510         mh.msg_iovlen = 1;
511
512         k = recvmsg(b->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
513         if (k < 0)
514                 return errno == EAGAIN ? 0 : -errno;
515
516         b->rbuffer_size += k;
517
518         r = bus_auth_verify(b);
519         if (r != 0)
520                 return r;
521
522         return 1;
523 }
524
525 static int bus_start_auth(sd_bus *b) {
526         static const char auth_prefix[] = "\0AUTH EXTERNAL ";
527         static const char auth_suffix[] = "\r\nNEGOTIATE_UNIX_FD\r\nBEGIN\r\n";
528
529         char text[20 + 1]; /* enough space for a 64bit integer plus NUL */
530         size_t l;
531
532         assert(b);
533
534         b->state = BUS_AUTHENTICATING;
535
536         snprintf(text, sizeof(text), "%llu", (unsigned long long) geteuid());
537         char_array_0(text);
538
539         l = strlen(text);
540         b->auth_uid = hexmem(text, l);
541         if (!b->auth_uid)
542                 return -ENOMEM;
543
544         b->auth_iovec[0].iov_base = (void*) auth_prefix;
545         b->auth_iovec[0].iov_len = sizeof(auth_prefix) -1;
546         b->auth_iovec[1].iov_base = (void*) b->auth_uid;
547         b->auth_iovec[1].iov_len = l * 2;
548         b->auth_iovec[2].iov_base = (void*) auth_suffix;
549         b->auth_iovec[2].iov_len = sizeof(auth_suffix) -1;
550         b->auth_size = sizeof(auth_prefix) - 1 + l * 2 + sizeof(auth_suffix) - 1;
551
552         return bus_write_auth(b);
553 }
554
555 static int bus_start_connect(sd_bus *b) {
556         int r;
557
558         assert(b);
559         assert(b->fd < 0);
560
561         for (;;) {
562                 if (b->sockaddr.sa.sa_family == AF_UNSPEC) {
563                         r = bus_parse_next_address(b);
564                         if (r < 0)
565                                 return r;
566                         if (r == 0)
567                                 return b->last_connect_error ? -b->last_connect_error : -ECONNREFUSED;
568                 }
569
570                 b->fd = socket(b->sockaddr.sa.sa_family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
571                 if (b->fd < 0) {
572                         b->last_connect_error = errno;
573                         zero(b->sockaddr);
574                         continue;
575                 }
576
577                 r = connect(b->fd, &b->sockaddr.sa, b->sockaddr_size);
578                 if (r < 0) {
579                         if (errno == EINPROGRESS)
580                                 return 1;
581
582                         b->last_connect_error = errno;
583                         close_nointr_nofail(b->fd);
584                         b->fd = -1;
585                         zero(b->sockaddr);
586                         continue;
587                 }
588
589                 return bus_start_auth(b);
590         }
591 }
592
593 int sd_bus_open_system(sd_bus **ret) {
594         const char *e;
595         sd_bus *b;
596         int r;
597
598         if (!ret)
599                 return -EINVAL;
600
601         e = getenv("DBUS_SYSTEM_BUS_ADDRESS");
602         if (e) {
603                 r = sd_bus_open_address(e, &b);
604                 if (r < 0)
605                         return r;
606         } else {
607                 b = bus_new();
608                 if (!b)
609                         return -ENOMEM;
610
611                 b->sockaddr.un.sun_family = AF_UNIX;
612                 strncpy(b->sockaddr.un.sun_path, "/run/dbus/system_bus_socket", sizeof(b->sockaddr.un.sun_path));
613                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + sizeof("/run/dbus/system_bus_socket") - 1;
614
615                 r = bus_start_connect(b);
616                 if (r < 0) {
617                         bus_free(b);
618                         return r;
619                 }
620         }
621
622         r = bus_send_hello(b);
623         if (r < 0) {
624                 sd_bus_unref(b);
625                 return r;
626         }
627
628         *ret = b;
629         return 0;
630 }
631
632 int sd_bus_open_user(sd_bus **ret) {
633         const char *e;
634         sd_bus *b;
635         size_t l;
636         int r;
637
638         if (!ret)
639                 return -EINVAL;
640
641         e = getenv("DBUS_SESSION_BUS_ADDRESS");
642         if (e) {
643                 r = sd_bus_open_address(e, &b);
644                 if (r < 0)
645                         return r;
646         } else {
647                 e = getenv("XDG_RUNTIME_DIR");
648                 if (!e)
649                         return -ENOENT;
650
651                 l = strlen(e);
652                 if (l + 4 > sizeof(b->sockaddr.un.sun_path))
653                         return -E2BIG;
654
655                 b = bus_new();
656                 if (!b)
657                         return -ENOMEM;
658
659                 b->sockaddr.un.sun_family = AF_UNIX;
660                 memcpy(mempcpy(b->sockaddr.un.sun_path, e, l), "/bus", 4);
661                 b->sockaddr_size = offsetof(struct sockaddr_un, sun_path) + l + 4;
662
663                 r = bus_start_connect(b);
664                 if (r < 0) {
665                         bus_free(b);
666                         return r;
667                 }
668         }
669
670         r = bus_send_hello(b);
671         if (r < 0) {
672                 sd_bus_unref(b);
673                 return r;
674         }
675
676         *ret = b;
677         return 0;
678 }
679
680 int sd_bus_open_address(const char *address, sd_bus **ret) {
681         sd_bus *b;
682         int r;
683
684         if (!address)
685                 return -EINVAL;
686         if (!ret)
687                 return -EINVAL;
688
689         b = bus_new();
690         if (!b)
691                 return -ENOMEM;
692
693         b->address = strdup(address);
694         if (!b->address) {
695                 bus_free(b);
696                 return -ENOMEM;
697         }
698
699         r = bus_start_connect(b);
700         if (r < 0) {
701                 bus_free(b);
702                 return r;
703         }
704
705         *ret = b;
706         return 0;
707 }
708
709 int sd_bus_open_fd(int fd, sd_bus **ret) {
710         sd_bus *b;
711         int r;
712
713         if (fd < 0)
714                 return -EINVAL;
715         if (!ret)
716                 return -EINVAL;
717
718         b = bus_new();
719         if (!b)
720                 return -ENOMEM;
721
722         b->fd = fd;
723         fd_nonblock(b->fd, true);
724         fd_cloexec(b->fd, true);
725
726         r = bus_start_auth(b);
727         if (r < 0) {
728                 bus_free(b);
729                 return r;
730         }
731
732         *ret = b;
733         return 0;
734 }
735
736 void sd_bus_close(sd_bus *bus) {
737         if (!bus)
738                 return;
739         if (bus->fd < 0)
740                 return;
741
742         close_nointr_nofail(bus->fd);
743         bus->fd = -1;
744 }
745
746 sd_bus *sd_bus_ref(sd_bus *bus) {
747         if (!bus)
748                 return NULL;
749
750         assert(bus->n_ref > 0);
751
752         bus->n_ref++;
753         return bus;
754 }
755
756 sd_bus *sd_bus_unref(sd_bus *bus) {
757         if (!bus)
758                 return NULL;
759
760         assert(bus->n_ref > 0);
761         bus->n_ref--;
762
763         if (bus->n_ref <= 0)
764                 bus_free(bus);
765
766         return NULL;
767 }
768
769 int sd_bus_is_open(sd_bus *bus) {
770         if (!bus)
771                 return -EINVAL;
772
773         return bus->fd >= 0;
774 }
775
776 int sd_bus_is_running(sd_bus *bus) {
777         if (!bus)
778                 return -EINVAL;
779
780         if (bus->fd < 0)
781                 return -ENOTCONN;
782
783         return bus->state == BUS_RUNNING;
784 }
785
786 int sd_bus_can_send(sd_bus *bus, char type) {
787
788         if (!bus)
789                 return -EINVAL;
790         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
791                 return -EAGAIN;
792
793         if (type == SD_BUS_TYPE_UNIX_FD)
794                 return bus->can_fds;
795
796         return bus_type_is_valid(type);
797 }
798
799 static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
800         assert(m);
801
802         if (m->header->version > b->message_version)
803                 return -EPERM;
804
805         if (m->sealed)
806                 return 0;
807
808         return bus_message_seal(m, ++b->serial);
809 }
810
811 static int message_write(sd_bus *bus, sd_bus_message *m, size_t *idx) {
812         struct msghdr mh;
813         struct iovec *iov;
814         ssize_t k;
815         size_t n;
816         unsigned j;
817
818         assert(bus);
819         assert(m);
820         assert(idx);
821         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
822
823         if (*idx >= m->size)
824                 return 0;
825
826         n = m->n_iovec * sizeof(struct iovec);
827         iov = alloca(n);
828         memcpy(iov, m->iovec, n);
829
830         j = 0;
831         iovec_advance(iov, &j, *idx);
832
833         zero(mh);
834         mh.msg_iov = iov;
835         mh.msg_iovlen = m->n_iovec;
836
837         k = sendmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
838         if (k < 0)
839                 return errno == EAGAIN ? 0 : -errno;
840
841         *idx += (size_t) k;
842         return 1;
843 }
844
845 static int message_read_need(sd_bus *bus, size_t *need) {
846         uint32_t a, b;
847         uint8_t e;
848
849         assert(bus);
850         assert(need);
851         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
852
853         if (bus->rbuffer_size < sizeof(struct bus_header)) {
854                 *need = sizeof(struct bus_header) + 8;
855
856                 /* Minimum message size:
857                  *
858                  * Header +
859                  *
860                  *  Method Call: +2 string headers
861                  *       Signal: +3 string headers
862                  * Method Error: +1 string headers
863                  *               +1 uint32 headers
864                  * Method Reply: +1 uint32 headers
865                  *
866                  * A string header is at least 9 bytes
867                  * A uint32 header is at least 8 bytes
868                  *
869                  * Hence the minimum message size of a valid message
870                  * is header + 8 bytes */
871
872                 return 0;
873         }
874
875         a = ((const uint32_t*) bus->rbuffer)[1];
876         b = ((const uint32_t*) bus->rbuffer)[3];
877
878         e = ((const uint8_t*) bus->rbuffer)[0];
879         if (e == SD_BUS_LITTLE_ENDIAN) {
880                 a = le32toh(a);
881                 b = le32toh(b);
882         } else if (e == SD_BUS_BIG_ENDIAN) {
883                 a = be32toh(a);
884                 b = be32toh(b);
885         } else
886                 return -EBADMSG;
887
888         *need = sizeof(struct bus_header) + ALIGN_TO(b, 8) + a;
889         return 0;
890 }
891
892 static int message_make(sd_bus *bus, size_t size, sd_bus_message **m) {
893         sd_bus_message *t;
894         void *b = NULL;
895         int r;
896
897         assert(bus);
898         assert(m);
899         assert(bus->rbuffer_size >= size);
900         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
901
902         if (bus->rbuffer_size > size) {
903                 b = memdup((const uint8_t*) bus->rbuffer + size, bus->rbuffer_size - size);
904                 if (!b) {
905                         free(t);
906                         return -ENOMEM;
907                 }
908         }
909
910         r = bus_message_from_malloc(bus->rbuffer, size, &t);
911         if (r < 0) {
912                 free(b);
913                 return r;
914         }
915
916         bus->rbuffer = b;
917         bus->rbuffer_size -= size;
918
919         *m = t;
920         return 1;
921 }
922
923 static int message_read(sd_bus *bus, sd_bus_message **m) {
924         struct msghdr mh;
925         struct iovec iov;
926         ssize_t k;
927         size_t need;
928         int r;
929         void *b;
930
931         assert(bus);
932         assert(m);
933         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
934
935         r = message_read_need(bus, &need);
936         if (r < 0)
937                 return r;
938
939         if (bus->rbuffer_size >= need)
940                 return message_make(bus, need, m);
941
942         b = realloc(bus->rbuffer, need);
943         if (!b)
944                 return -ENOMEM;
945
946         bus->rbuffer = b;
947
948         zero(iov);
949         iov.iov_base = (uint8_t*) bus->rbuffer + bus->rbuffer_size;
950         iov.iov_len = need - bus->rbuffer_size;
951
952         zero(mh);
953         mh.msg_iov = &iov;
954         mh.msg_iovlen = 1;
955
956         k = recvmsg(bus->fd, &mh, MSG_DONTWAIT|MSG_NOSIGNAL);
957         if (k < 0)
958                 return errno == EAGAIN ? 0 : -errno;
959
960         bus->rbuffer_size += k;
961
962         r = message_read_need(bus, &need);
963         if (r < 0)
964                 return r;
965
966         if (bus->rbuffer_size >= need)
967                 return message_make(bus, need, m);
968
969         return 1;
970 }
971
972 static int dispatch_wqueue(sd_bus *bus) {
973         int r, ret = 0;
974
975         assert(bus);
976         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
977
978         if (bus->fd < 0)
979                 return -ENOTCONN;
980
981         while (bus->wqueue_size > 0) {
982
983                 r = message_write(bus, bus->wqueue[0], &bus->windex);
984                 if (r < 0) {
985                         sd_bus_close(bus);
986                         return r;
987                 } else if (r == 0)
988                         /* Didn't do anything this time */
989                         return ret;
990                 else if (bus->windex >= bus->wqueue[0]->size) {
991                         /* Fully written. Let's drop the entry from
992                          * the queue.
993                          *
994                          * This isn't particularly optimized, but
995                          * well, this is supposed to be our worst-case
996                          * buffer only, and the socket buffer is
997                          * supposed to be our primary buffer, and if
998                          * it got full, then all bets are off
999                          * anyway. */
1000
1001                         sd_bus_message_unref(bus->wqueue[0]);
1002                         bus->wqueue_size --;
1003                         memmove(bus->wqueue, bus->wqueue + 1, sizeof(sd_bus_message*) * bus->wqueue_size);
1004                         bus->windex = 0;
1005
1006                         ret = 1;
1007                 }
1008         }
1009
1010         return ret;
1011 }
1012
1013 static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
1014         sd_bus_message *z;
1015         int r, ret = 0;
1016
1017         assert(bus);
1018         assert(m);
1019         assert(bus->state == BUS_RUNNING || bus->state == BUS_HELLO);
1020
1021         if (bus->fd < 0)
1022                 return -ENOTCONN;
1023
1024         if (bus->rqueue_size > 0) {
1025                 /* Dispatch a queued message */
1026
1027                 *m = bus->rqueue[0];
1028                 bus->rqueue_size --;
1029                 memmove(bus->rqueue, bus->rqueue + 1, sizeof(sd_bus_message*) * bus->rqueue_size);
1030                 return 1;
1031         }
1032
1033         /* Try to read a new message */
1034         do {
1035                 r = message_read(bus, &z);
1036                 if (r < 0) {
1037                         sd_bus_close(bus);
1038                         return r;
1039                 }
1040                 if (r == 0)
1041                         return ret;
1042
1043                 r = 1;
1044         } while (!z);
1045
1046         *m = z;
1047         return 1;
1048 }
1049
1050 int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
1051         int r;
1052
1053         if (!bus)
1054                 return -EINVAL;
1055         if (bus->fd < 0)
1056                 return -ENOTCONN;
1057         if (!m)
1058                 return -EINVAL;
1059
1060         /* If the serial number isn't kept, then we know that no reply
1061          * is expected */
1062         if (!serial && !m->sealed)
1063                 m->header->flags |= SD_BUS_MESSAGE_NO_REPLY_EXPECTED;
1064
1065         r = bus_seal_message(bus, m);
1066         if (r < 0)
1067                 return r;
1068
1069         /* If this is a reply and no reply was requested, then let's
1070          * suppress this, if we can */
1071         if (m->dont_send && !serial)
1072                 return 0;
1073
1074         if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
1075                 size_t idx = 0;
1076
1077                 r = message_write(bus, m, &idx);
1078                 if (r < 0) {
1079                         sd_bus_close(bus);
1080                         return r;
1081                 } else if (idx < m->size)  {
1082                         /* Wasn't fully written. So let's remember how
1083                          * much was written. Note that the first entry
1084                          * of the wqueue array is always allocated so
1085                          * that we always can remember how much was
1086                          * written. */
1087                         bus->wqueue[0] = sd_bus_message_ref(m);
1088                         bus->wqueue_size = 1;
1089                         bus->windex = idx;
1090                 }
1091         } else {
1092                 sd_bus_message **q;
1093
1094                 /* Just append it to the queue. */
1095
1096                 if (bus->wqueue_size >= WQUEUE_MAX)
1097                         return -ENOBUFS;
1098
1099                 q = realloc(bus->wqueue, sizeof(sd_bus_message*) * (bus->wqueue_size + 1));
1100                 if (!q)
1101                         return -ENOMEM;
1102
1103                 bus->wqueue = q;
1104                 q[bus->wqueue_size ++] = sd_bus_message_ref(m);
1105         }
1106
1107         if (serial)
1108                 *serial = BUS_MESSAGE_SERIAL(m);
1109
1110         return 0;
1111 }
1112
1113 static usec_t calc_elapse(uint64_t usec) {
1114         if (usec == (uint64_t) -1)
1115                 return 0;
1116
1117         if (usec == 0)
1118                 usec = BUS_DEFAULT_TIMEOUT;
1119
1120         return now(CLOCK_MONOTONIC) + usec;
1121 }
1122
1123 static int timeout_compare(const void *a, const void *b) {
1124         const struct reply_callback *x = a, *y = b;
1125
1126         if (x->timeout != 0 && y->timeout == 0)
1127                 return -1;
1128
1129         if (x->timeout == 0 && y->timeout != 0)
1130                 return 1;
1131
1132         if (x->timeout < y->timeout)
1133                 return -1;
1134
1135         if (x->timeout > y->timeout)
1136                 return 1;
1137
1138         return 0;
1139 }
1140
1141 int sd_bus_send_with_reply(
1142                 sd_bus *bus,
1143                 sd_bus_message *m,
1144                 sd_message_handler_t callback,
1145                 void *userdata,
1146                 uint64_t usec,
1147                 uint64_t *serial) {
1148
1149         struct reply_callback *c;
1150         int r;
1151
1152         if (!bus)
1153                 return -EINVAL;
1154         if (bus->fd < 0)
1155                 return -ENOTCONN;
1156         if (!m)
1157                 return -EINVAL;
1158         if (!callback)
1159                 return -EINVAL;
1160         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1161                 return -EINVAL;
1162         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1163                 return -EINVAL;
1164
1165         r = hashmap_ensure_allocated(&bus->reply_callbacks, uint64_hash_func, uint64_compare_func);
1166         if (r < 0)
1167                 return r;
1168
1169         if (usec != (uint64_t) -1) {
1170                 r = prioq_ensure_allocated(&bus->reply_callbacks_prioq, timeout_compare);
1171                 if (r < 0)
1172                         return r;
1173         }
1174
1175         r = bus_seal_message(bus, m);
1176         if (r < 0)
1177                 return r;
1178
1179         c = new(struct reply_callback, 1);
1180         if (!c)
1181                 return -ENOMEM;
1182
1183         c->callback = callback;
1184         c->userdata = userdata;
1185         c->serial = BUS_MESSAGE_SERIAL(m);
1186         c->timeout = calc_elapse(usec);
1187
1188         r = hashmap_put(bus->reply_callbacks, &c->serial, c);
1189         if (r < 0) {
1190                 free(c);
1191                 return r;
1192         }
1193
1194         if (c->timeout != 0) {
1195                 r = prioq_put(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1196                 if (r < 0) {
1197                         c->timeout = 0;
1198                         sd_bus_send_with_reply_cancel(bus, c->serial);
1199                         return r;
1200                 }
1201         }
1202
1203         r = sd_bus_send(bus, m, serial);
1204         if (r < 0) {
1205                 sd_bus_send_with_reply_cancel(bus, c->serial);
1206                 return r;
1207         }
1208
1209         return r;
1210 }
1211
1212 int sd_bus_send_with_reply_cancel(sd_bus *bus, uint64_t serial) {
1213         struct reply_callback *c;
1214
1215         if (!bus)
1216                 return -EINVAL;
1217         if (serial == 0)
1218                 return -EINVAL;
1219
1220         c = hashmap_remove(bus->reply_callbacks, &serial);
1221         if (!c)
1222                 return 0;
1223
1224         if (c->timeout != 0)
1225                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1226
1227         free(c);
1228         return 1;
1229 }
1230
1231 static int ensure_running(sd_bus *bus) {
1232         int r;
1233
1234         assert(bus);
1235
1236         r = sd_bus_is_running(bus);
1237         if (r != 0)
1238                 return r;
1239
1240         for (;;) {
1241                 int k;
1242
1243                 r = sd_bus_process(bus, NULL);
1244
1245                 if (r < 0)
1246                         return r;
1247
1248                 k = sd_bus_is_running(bus);
1249                 if (k != 0)
1250                         return k;
1251
1252                 if (r > 0)
1253                         continue;
1254
1255                 r = sd_bus_wait(bus, (uint64_t) -1);
1256                 if (r < 0)
1257                         return r;
1258         }
1259 }
1260
1261 int sd_bus_send_with_reply_and_block(
1262                 sd_bus *bus,
1263                 sd_bus_message *m,
1264                 uint64_t usec,
1265                 sd_bus_error *error,
1266                 sd_bus_message **reply) {
1267
1268         int r;
1269         usec_t timeout;
1270         uint64_t serial;
1271         bool room = false;
1272
1273         if (!bus)
1274                 return -EINVAL;
1275         if (bus->fd < 0)
1276                 return -ENOTCONN;
1277         if (!m)
1278                 return -EINVAL;
1279         if (m->header->type != SD_BUS_MESSAGE_TYPE_METHOD_CALL)
1280                 return -EINVAL;
1281         if (m->header->flags & SD_BUS_MESSAGE_NO_REPLY_EXPECTED)
1282                 return -EINVAL;
1283         if (bus_error_is_dirty(error))
1284                 return -EINVAL;
1285
1286         r = ensure_running(bus);
1287         if (r < 0)
1288                 return r;
1289
1290         r = sd_bus_send(bus, m, &serial);
1291         if (r < 0)
1292                 return r;
1293
1294         timeout = calc_elapse(usec);
1295
1296         for (;;) {
1297                 usec_t left;
1298                 sd_bus_message *incoming = NULL;
1299
1300                 if (!room) {
1301                         sd_bus_message **q;
1302
1303                         /* Make sure there's room for queuing this
1304                          * locally, before we read the message */
1305
1306                         q = realloc(bus->rqueue, (bus->rqueue_size + 1) * sizeof(sd_bus_message*));
1307                         if (!q)
1308                                 return -ENOMEM;
1309
1310                         bus->rqueue = q;
1311                         room = true;
1312                 }
1313
1314                 r = message_read(bus, &incoming);
1315                 if (r < 0)
1316                         return r;
1317                 if (incoming) {
1318
1319                         if (incoming->reply_serial == serial) {
1320                                 /* Found a match! */
1321
1322                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1323                                         *reply = incoming;
1324                                         return 0;
1325                                 }
1326
1327                                 if (incoming->header->type == SD_BUS_MESSAGE_TYPE_METHOD_ERROR) {
1328                                         int k;
1329
1330                                         r = sd_bus_error_copy(error, &incoming->error);
1331                                         if (r < 0) {
1332                                                 sd_bus_message_unref(incoming);
1333                                                 return r;
1334                                         }
1335
1336                                         k = bus_error_to_errno(&incoming->error);
1337                                         sd_bus_message_unref(incoming);
1338                                         return k;
1339                                 }
1340
1341                                 sd_bus_message_unref(incoming);
1342                                 return -EIO;
1343                         }
1344
1345                         /* There's already guaranteed to be room for
1346                          * this, so need to resize things here */
1347                         bus->rqueue[bus->rqueue_size ++] = incoming;
1348                         room = false;
1349
1350                         /* Try to read more, right-away */
1351                         continue;
1352                 }
1353                 if (r != 0)
1354                         continue;
1355
1356                 if (timeout > 0) {
1357                         usec_t n;
1358
1359                         n = now(CLOCK_MONOTONIC);
1360                         if (n >= timeout)
1361                                 return -ETIMEDOUT;
1362
1363                         left = timeout - n;
1364                 } else
1365                         left = (uint64_t) -1;
1366
1367                 r = bus_poll(bus, true, left);
1368                 if (r < 0)
1369                         return r;
1370
1371                 r = dispatch_wqueue(bus);
1372                 if (r < 0)
1373                         return r;
1374         }
1375 }
1376
1377 int sd_bus_get_fd(sd_bus *bus) {
1378         if (!bus)
1379                 return -EINVAL;
1380
1381         if (bus->fd < 0)
1382                 return -ENOTCONN;
1383
1384         return bus->fd;
1385 }
1386
1387 int sd_bus_get_events(sd_bus *bus) {
1388         int flags = 0;
1389
1390         if (!bus)
1391                 return -EINVAL;
1392         if (bus->fd < 0)
1393                 return -ENOTCONN;
1394
1395         if (bus->state == BUS_OPENING)
1396                 flags |= POLLOUT;
1397         else if (bus->state == BUS_AUTHENTICATING) {
1398
1399                 if (bus->auth_index < ELEMENTSOF(bus->auth_iovec))
1400                         flags |= POLLOUT;
1401
1402                 flags |= POLLIN;
1403
1404         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1405                 if (bus->rqueue_size <= 0)
1406                         flags |= POLLIN;
1407                 if (bus->wqueue_size > 0)
1408                         flags |= POLLOUT;
1409         }
1410
1411         return flags;
1412 }
1413
1414 int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
1415         struct reply_callback *c;
1416
1417         if (!bus)
1418                 return -EINVAL;
1419         if (!timeout_usec)
1420                 return -EINVAL;
1421         if (bus->fd < 0)
1422                 return -ENOTCONN;
1423
1424         if (bus->state == BUS_AUTHENTICATING) {
1425                 *timeout_usec = bus->auth_timeout;
1426                 return 1;
1427         }
1428
1429         if (bus->state != BUS_RUNNING && bus->state != BUS_HELLO)
1430                 return 0;
1431
1432         c = prioq_peek(bus->reply_callbacks_prioq);
1433         if (!c)
1434                 return 0;
1435
1436         *timeout_usec = c->timeout;
1437         return 1;
1438 }
1439
1440 static int process_timeout(sd_bus *bus) {
1441         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1442         struct reply_callback *c;
1443         usec_t n;
1444         int r;
1445
1446         assert(bus);
1447
1448         c = prioq_peek(bus->reply_callbacks_prioq);
1449         if (!c)
1450                 return 0;
1451
1452         n = now(CLOCK_MONOTONIC);
1453         if (c->timeout > n)
1454                 return 0;
1455
1456         assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
1457         hashmap_remove(bus->reply_callbacks, &c->serial);
1458
1459         r = c->callback(bus, ETIMEDOUT, NULL, c->userdata);
1460         free(c);
1461
1462         return r < 0 ? r : 1;
1463 }
1464
1465 static int process_message(sd_bus *bus, sd_bus_message *m) {
1466         struct filter_callback *l;
1467         int r;
1468
1469         assert(bus);
1470         assert(m);
1471
1472         if (m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_CALL || m->header->type == SD_BUS_MESSAGE_TYPE_METHOD_RETURN) {
1473                 struct reply_callback *c;
1474
1475                 c = hashmap_remove(bus->reply_callbacks, &m->reply_serial);
1476                 if (c) {
1477                         if (c->timeout != 0)
1478                                 prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
1479
1480                         r = c->callback(bus, 0, m, c->userdata);
1481                         free(c);
1482
1483                         if (r != 0)
1484                                 return r;
1485                 }
1486         }
1487
1488         LIST_FOREACH(callbacks, l, bus->filter_callbacks) {
1489                 r = l->callback(bus, 0, m, l->userdata);
1490                 if (r != 0)
1491                         return r;
1492         }
1493
1494         return 0;
1495 }
1496
1497 int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
1498         int r;
1499
1500         /* Returns 0 when we didn't do anything. This should cause the
1501          * caller to invoke sd_bus_wait() before returning the next
1502          * time. Returns > 0 when we did something, which possibly
1503          * means *ret is filled in with an unprocessed message. */
1504
1505         if (!bus)
1506                 return -EINVAL;
1507         if (bus->fd < 0)
1508                 return -ENOTCONN;
1509
1510         if (bus->state == BUS_OPENING) {
1511                 struct pollfd p;
1512
1513                 zero(p);
1514                 p.fd = bus->fd;
1515                 p.events = POLLOUT;
1516
1517                 r = poll(&p, 1, 0);
1518                 if (r < 0)
1519                         return -errno;
1520
1521                 if (p.revents & (POLLOUT|POLLERR|POLLHUP)) {
1522                         int error = 0;
1523                         socklen_t slen = sizeof(error);
1524
1525                         r = getsockopt(bus->fd, SOL_SOCKET, SO_ERROR, &error, &slen);
1526                         if (r < 0)
1527                                 bus->last_connect_error = errno;
1528                         else if (error != 0)
1529                                 bus->last_connect_error = error;
1530                         else if (p.revents & (POLLERR|POLLHUP))
1531                                 bus->last_connect_error = ECONNREFUSED;
1532                         else {
1533                                 r = bus_start_auth(bus);
1534                                 goto null_message;
1535                         }
1536
1537                         /* Try next address */
1538                         r = bus_start_connect(bus);
1539                         goto null_message;
1540                 }
1541
1542                 r = 0;
1543                 goto null_message;
1544
1545         } else if (bus->state == BUS_AUTHENTICATING) {
1546
1547                 if (now(CLOCK_MONOTONIC) >= bus->auth_timeout)
1548                         return -ETIMEDOUT;
1549
1550                 r = bus_write_auth(bus);
1551                 if (r != 0)
1552                         goto null_message;
1553
1554                 r = bus_read_auth(bus);
1555                 goto null_message;
1556
1557         } else if (bus->state == BUS_RUNNING || bus->state == BUS_HELLO) {
1558                 _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
1559                 int k;
1560
1561                 r = process_timeout(bus);
1562                 if (r != 0)
1563                         goto null_message;
1564
1565                 r = dispatch_wqueue(bus);
1566                 if (r != 0)
1567                         goto null_message;
1568
1569                 k = r;
1570                 r = dispatch_rqueue(bus, &m);
1571                 if (r < 0)
1572                         return r;
1573                 if (!m) {
1574                         if (r == 0)
1575                                 r = k;
1576                         goto null_message;
1577                 }
1578
1579                 r = process_message(bus, m);
1580                 if (r != 0)
1581                         goto null_message;
1582
1583                 if (ret) {
1584                         *ret = m;
1585                         m = NULL;
1586                         return 1;
1587                 }
1588
1589                 if (sd_bus_message_is_method_call(m, NULL, NULL)) {
1590                         const sd_bus_error e = SD_BUS_ERROR_INIT_CONST("org.freedesktop.DBus.Error.UnknownObject", "Unknown object.");
1591                         _cleanup_bus_message_unref_ sd_bus_message *reply = NULL;
1592
1593                         r = sd_bus_message_new_method_error(bus, m, &e, &reply);
1594                         if (r < 0)
1595                                 return r;
1596
1597                         r = sd_bus_send(bus, reply, NULL);
1598                         if (r < 0)
1599                                 return r;
1600                 }
1601
1602                 return 1;
1603         }
1604
1605         assert_not_reached("Unknown state");
1606
1607 null_message:
1608         if (r >= 0 && ret)
1609                 *ret = NULL;
1610
1611         return r;
1612 }
1613
1614 static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
1615         struct pollfd p;
1616         int r, e;
1617         struct timespec ts;
1618         usec_t until, m;
1619
1620         assert(bus);
1621
1622         if (bus->fd < 0)
1623                 return -ENOTCONN;
1624
1625         e = sd_bus_get_events(bus);
1626         if (e < 0)
1627                 return e;
1628
1629         if (need_more)
1630                 e |= POLLIN;
1631
1632         r = sd_bus_get_timeout(bus, &until);
1633         if (r < 0)
1634                 return r;
1635         if (r == 0)
1636                 m = (uint64_t) -1;
1637         else {
1638                 usec_t n;
1639                 n = now(CLOCK_MONOTONIC);
1640                 m = until > n ? until - n : 0;
1641         }
1642
1643         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
1644                 m = timeout_usec;
1645
1646         zero(p);
1647         p.fd = bus->fd;
1648         p.events = e;
1649
1650         r = ppoll(&p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
1651         if (r < 0)
1652                 return -errno;
1653
1654         return r > 0 ? 1 : 0;
1655 }
1656
1657 int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
1658
1659         if (!bus)
1660                 return -EINVAL;
1661         if (bus->fd < 0)
1662                 return -ENOTCONN;
1663         if (bus->rqueue_size > 0)
1664                 return 0;
1665
1666         return bus_poll(bus, false, timeout_usec);
1667 }
1668
1669 int sd_bus_flush(sd_bus *bus) {
1670         int r;
1671
1672         if (!bus)
1673                 return -EINVAL;
1674         if (bus->fd < 0)
1675                 return -ENOTCONN;
1676
1677         r = ensure_running(bus);
1678         if (r < 0)
1679                 return r;
1680
1681         if (bus->wqueue_size <= 0)
1682                 return 0;
1683
1684         for (;;) {
1685                 r = dispatch_wqueue(bus);
1686                 if (r < 0)
1687                         return r;
1688
1689                 if (bus->wqueue_size <= 0)
1690                         return 0;
1691
1692                 r = bus_poll(bus, false, (uint64_t) -1);
1693                 if (r < 0)
1694                         return r;
1695         }
1696 }
1697
1698 int sd_bus_add_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1699         struct filter_callback *f;
1700
1701         if (!bus)
1702                 return -EINVAL;
1703         if (!callback)
1704                 return -EINVAL;
1705
1706         f = new(struct filter_callback, 1);
1707         if (!f)
1708                 return -ENOMEM;
1709         f->callback = callback;
1710         f->userdata = userdata;
1711
1712         LIST_PREPEND(struct filter_callback, callbacks, bus->filter_callbacks, f);
1713         return 0;
1714 }
1715
1716 int sd_bus_remove_filter(sd_bus *bus, sd_message_handler_t callback, void *userdata) {
1717         struct filter_callback *f;
1718
1719         if (!bus)
1720                 return -EINVAL;
1721         if (!callback)
1722                 return -EINVAL;
1723
1724         LIST_FOREACH(callbacks, f, bus->filter_callbacks) {
1725                 if (f->callback == callback && f->userdata == userdata) {
1726                         LIST_REMOVE(struct filter_callback, callbacks, bus->filter_callbacks, f);
1727                         free(f);
1728                         return 1;
1729                 }
1730         }
1731
1732         return 0;
1733 }