chiark / gitweb /
sd-rtnl: process - only apply matches to broadcast messages
[elogind.git] / src / libsystemd / sd-rtnl / sd-rtnl.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2013 Tom Gundersen <teg@jklm.no>
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/socket.h>
23 #include <poll.h>
24
25 #include "missing.h"
26 #include "macro.h"
27 #include "util.h"
28 #include "hashmap.h"
29
30 #include "sd-rtnl.h"
31 #include "rtnl-internal.h"
32 #include "rtnl-util.h"
33
34 static int sd_rtnl_new(sd_rtnl **ret) {
35         _cleanup_rtnl_unref_ sd_rtnl *rtnl = NULL;
36
37         assert_return(ret, -EINVAL);
38
39         rtnl = new0(sd_rtnl, 1);
40         if (!rtnl)
41                 return -ENOMEM;
42
43         rtnl->n_ref = REFCNT_INIT;
44
45         rtnl->fd = -1;
46
47         rtnl->sockaddr.nl.nl_family = AF_NETLINK;
48
49         rtnl->original_pid = getpid();
50
51         LIST_HEAD_INIT(rtnl->match_callbacks);
52
53         /* We guarantee that wqueue always has space for at least
54          * one entry */
55         if (!GREEDY_REALLOC(rtnl->wqueue, rtnl->wqueue_allocated, 1))
56                 return -ENOMEM;
57
58         /* We guarantee that the read buffer has at least space for
59          * a message header */
60         if (!greedy_realloc((void**)&rtnl->rbuffer, &rtnl->rbuffer_allocated,
61                             sizeof(struct nlmsghdr), sizeof(uint8_t)))
62                 return -ENOMEM;
63
64         /* Change notification responses have sequence 0, so we must
65          * start our request sequence numbers at 1, or we may confuse our
66          * responses with notifications from the kernel */
67         rtnl->serial = 1;
68
69         *ret = rtnl;
70         rtnl = NULL;
71
72         return 0;
73 }
74
75 int sd_rtnl_new_from_netlink(sd_rtnl **ret, int fd) {
76         _cleanup_rtnl_unref_ sd_rtnl *rtnl = NULL;
77         socklen_t addrlen;
78         int r;
79
80         assert_return(ret, -EINVAL);
81
82         r = sd_rtnl_new(&rtnl);
83         if (r < 0)
84                 return r;
85
86         addrlen = sizeof(rtnl->sockaddr);
87
88         r = getsockname(fd, &rtnl->sockaddr.sa, &addrlen);
89         if (r < 0)
90                 return -errno;
91
92         rtnl->fd = fd;
93
94         *ret = rtnl;
95         rtnl = NULL;
96
97         return 0;
98 }
99
100 static bool rtnl_pid_changed(sd_rtnl *rtnl) {
101         assert(rtnl);
102
103         /* We don't support people creating an rtnl connection and
104          * keeping it around over a fork(). Let's complain. */
105
106         return rtnl->original_pid != getpid();
107 }
108
109 static int rtnl_compute_groups_ap(uint32_t *_groups, unsigned n_groups, va_list ap) {
110         uint32_t groups = 0;
111         unsigned i;
112
113         for (i = 0; i < n_groups; i++) {
114                 unsigned group;
115
116                 group = va_arg(ap, unsigned);
117                 assert_return(group < 32, -EINVAL);
118
119                 groups |= group ? (1 << (group - 1)) : 0;
120         }
121
122         *_groups = groups;
123
124         return 0;
125 }
126
127 static int rtnl_open_fd_ap(sd_rtnl **ret, int fd, unsigned n_groups, va_list ap) {
128         _cleanup_rtnl_unref_ sd_rtnl *rtnl = NULL;
129         socklen_t addrlen;
130         int r, one = 1;
131
132         assert_return(ret, -EINVAL);
133         assert_return(fd >= 0, -EINVAL);
134
135         r = sd_rtnl_new(&rtnl);
136         if (r < 0)
137                 return r;
138
139         r = setsockopt(fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
140         if (r < 0)
141                 return -errno;
142
143         r = setsockopt(fd, SOL_NETLINK, NETLINK_PKTINFO, &one, sizeof(one));
144         if (r < 0)
145                 return -errno;
146
147         r = rtnl_compute_groups_ap(&rtnl->sockaddr.nl.nl_groups, n_groups, ap);
148         if (r < 0)
149                 return r;
150
151         addrlen = sizeof(rtnl->sockaddr);
152
153         r = bind(fd, &rtnl->sockaddr.sa, addrlen);
154         /* ignore EINVAL to allow opening an already bound socket */
155         if (r < 0 && errno != EINVAL)
156                 return -errno;
157
158         r = getsockname(fd, &rtnl->sockaddr.sa, &addrlen);
159         if (r < 0)
160                 return -errno;
161
162         rtnl->fd = fd;
163
164         *ret = rtnl;
165         rtnl = NULL;
166
167         return 0;
168 }
169
170 int sd_rtnl_open_fd(sd_rtnl **ret, int fd, unsigned n_groups, ...) {
171         va_list ap;
172         int r;
173
174         va_start(ap, n_groups);
175         r = rtnl_open_fd_ap(ret, fd, n_groups, ap);
176         va_end(ap);
177
178         return r;
179 }
180
181 int sd_rtnl_open(sd_rtnl **ret, unsigned n_groups, ...) {
182         va_list ap;
183         int fd, r;
184
185         fd = socket(PF_NETLINK, SOCK_RAW|SOCK_CLOEXEC|SOCK_NONBLOCK, NETLINK_ROUTE);
186         if (fd < 0)
187                 return -errno;
188
189         va_start(ap, n_groups);
190         r = rtnl_open_fd_ap(ret, fd, n_groups, ap);
191         va_end(ap);
192
193         if (r < 0) {
194                 safe_close(fd);
195                 return r;
196         }
197
198         return 0;
199 }
200
201 int sd_rtnl_inc_rcvbuf(const sd_rtnl *const rtnl, const int size) {
202         return fd_inc_rcvbuf(rtnl->fd, size);
203 }
204
205 sd_rtnl *sd_rtnl_ref(sd_rtnl *rtnl) {
206         assert_return(rtnl, NULL);
207         assert_return(!rtnl_pid_changed(rtnl), NULL);
208
209         if (rtnl)
210                 assert_se(REFCNT_INC(rtnl->n_ref) >= 2);
211
212         return rtnl;
213 }
214
215 sd_rtnl *sd_rtnl_unref(sd_rtnl *rtnl) {
216         if (!rtnl)
217                 return NULL;
218
219         assert_return(!rtnl_pid_changed(rtnl), NULL);
220
221         if (REFCNT_DEC(rtnl->n_ref) == 0) {
222                 struct match_callback *f;
223                 unsigned i;
224
225                 for (i = 0; i < rtnl->rqueue_size; i++)
226                         sd_rtnl_message_unref(rtnl->rqueue[i]);
227                 free(rtnl->rqueue);
228
229                 for (i = 0; i < rtnl->rqueue_partial_size; i++)
230                         sd_rtnl_message_unref(rtnl->rqueue_partial[i]);
231                 free(rtnl->rqueue_partial);
232
233                 for (i = 0; i < rtnl->wqueue_size; i++)
234                         sd_rtnl_message_unref(rtnl->wqueue[i]);
235                 free(rtnl->wqueue);
236
237                 free(rtnl->rbuffer);
238
239                 hashmap_free_free(rtnl->reply_callbacks);
240                 prioq_free(rtnl->reply_callbacks_prioq);
241
242                 sd_event_source_unref(rtnl->io_event_source);
243                 sd_event_source_unref(rtnl->time_event_source);
244                 sd_event_source_unref(rtnl->exit_event_source);
245                 sd_event_unref(rtnl->event);
246
247                 while ((f = rtnl->match_callbacks)) {
248                         LIST_REMOVE(match_callbacks, rtnl->match_callbacks, f);
249                         free(f);
250                 }
251
252                 safe_close(rtnl->fd);
253                 free(rtnl);
254         }
255
256         return NULL;
257 }
258
259 static void rtnl_seal_message(sd_rtnl *rtnl, sd_rtnl_message *m) {
260         assert(rtnl);
261         assert(!rtnl_pid_changed(rtnl));
262         assert(m);
263         assert(m->hdr);
264
265         /* don't use seq == 0, as that is used for broadcasts, so we
266            would get confused by replies to such messages */
267         m->hdr->nlmsg_seq = rtnl->serial++ ? : rtnl->serial++;
268
269         rtnl_message_seal(m);
270
271         return;
272 }
273
274 int sd_rtnl_send(sd_rtnl *nl,
275                  sd_rtnl_message *message,
276                  uint32_t *serial) {
277         int r;
278
279         assert_return(nl, -EINVAL);
280         assert_return(!rtnl_pid_changed(nl), -ECHILD);
281         assert_return(message, -EINVAL);
282         assert_return(!message->sealed, -EPERM);
283
284         rtnl_seal_message(nl, message);
285
286         if (nl->wqueue_size <= 0) {
287                 /* send directly */
288                 r = socket_write_message(nl, message);
289                 if (r < 0)
290                         return r;
291                 else if (r == 0) {
292                         /* nothing was sent, so let's put it on
293                          * the queue */
294                         nl->wqueue[0] = sd_rtnl_message_ref(message);
295                         nl->wqueue_size = 1;
296                 }
297         } else {
298                 /* append to queue */
299                 if (nl->wqueue_size >= RTNL_WQUEUE_MAX) {
300                         log_debug("rtnl: exhausted the write queue size (%d)", RTNL_WQUEUE_MAX);
301                         return -ENOBUFS;
302                 }
303
304                 if (!GREEDY_REALLOC(nl->wqueue, nl->wqueue_allocated, nl->wqueue_size + 1))
305                         return -ENOMEM;
306
307                 nl->wqueue[nl->wqueue_size ++] = sd_rtnl_message_ref(message);
308         }
309
310         if (serial)
311                 *serial = rtnl_message_get_serial(message);
312
313         return 1;
314 }
315
316 int rtnl_rqueue_make_room(sd_rtnl *rtnl) {
317         assert(rtnl);
318
319         if (rtnl->rqueue_size >= RTNL_RQUEUE_MAX) {
320                 log_debug("rtnl: exhausted the read queue size (%d)", RTNL_RQUEUE_MAX);
321                 return -ENOBUFS;
322         }
323
324         if (!GREEDY_REALLOC(rtnl->rqueue, rtnl->rqueue_allocated, rtnl->rqueue_size + 1))
325                 return -ENOMEM;
326
327         return 0;
328 }
329
330 int rtnl_rqueue_partial_make_room(sd_rtnl *rtnl) {
331         assert(rtnl);
332
333         if (rtnl->rqueue_partial_size >= RTNL_RQUEUE_MAX) {
334                 log_debug("rtnl: exhausted the partial read queue size (%d)", RTNL_RQUEUE_MAX);
335                 return -ENOBUFS;
336         }
337
338         if (!GREEDY_REALLOC(rtnl->rqueue_partial, rtnl->rqueue_partial_allocated,
339                             rtnl->rqueue_partial_size + 1))
340                 return -ENOMEM;
341
342         return 0;
343 }
344
345 static int dispatch_rqueue(sd_rtnl *rtnl, sd_rtnl_message **message) {
346         int r;
347
348         assert(rtnl);
349         assert(message);
350
351         if (rtnl->rqueue_size <= 0) {
352                 /* Try to read a new message */
353                 r = socket_read_message(rtnl);
354                 if (r <= 0)
355                         return r;
356         }
357
358         /* Dispatch a queued message */
359         *message = rtnl->rqueue[0];
360         rtnl->rqueue_size --;
361         memmove(rtnl->rqueue, rtnl->rqueue + 1, sizeof(sd_rtnl_message*) * rtnl->rqueue_size);
362
363         return 1;
364 }
365
366 static int dispatch_wqueue(sd_rtnl *rtnl) {
367         int r, ret = 0;
368
369         assert(rtnl);
370
371         while (rtnl->wqueue_size > 0) {
372                 r = socket_write_message(rtnl, rtnl->wqueue[0]);
373                 if (r < 0)
374                         return r;
375                 else if (r == 0)
376                         /* Didn't do anything this time */
377                         return ret;
378                 else {
379                         /* see equivalent in sd-bus.c */
380                         sd_rtnl_message_unref(rtnl->wqueue[0]);
381                         rtnl->wqueue_size --;
382                         memmove(rtnl->wqueue, rtnl->wqueue + 1, sizeof(sd_rtnl_message*) * rtnl->wqueue_size);
383
384                         ret = 1;
385                 }
386         }
387
388         return ret;
389 }
390
391 static int process_timeout(sd_rtnl *rtnl) {
392         _cleanup_rtnl_message_unref_ sd_rtnl_message *m = NULL;
393         struct reply_callback *c;
394         usec_t n;
395         int r;
396
397         assert(rtnl);
398
399         c = prioq_peek(rtnl->reply_callbacks_prioq);
400         if (!c)
401                 return 0;
402
403         n = now(CLOCK_MONOTONIC);
404         if (c->timeout > n)
405                 return 0;
406
407         r = rtnl_message_new_synthetic_error(-ETIMEDOUT, c->serial, &m);
408         if (r < 0)
409                 return r;
410
411         assert_se(prioq_pop(rtnl->reply_callbacks_prioq) == c);
412         hashmap_remove(rtnl->reply_callbacks, &c->serial);
413
414         r = c->callback(rtnl, m, c->userdata);
415         if (r < 0)
416                 log_debug_errno(r, "sd-rtnl: timedout callback failed: %m");
417
418         free(c);
419
420         return 1;
421 }
422
423 static int process_reply(sd_rtnl *rtnl, sd_rtnl_message *m) {
424         struct reply_callback *c;
425         uint64_t serial;
426         int r;
427
428         assert(rtnl);
429         assert(m);
430
431         serial = rtnl_message_get_serial(m);
432         c = hashmap_remove(rtnl->reply_callbacks, &serial);
433         if (!c)
434                 return 0;
435
436         if (c->timeout != 0)
437                 prioq_remove(rtnl->reply_callbacks_prioq, c, &c->prioq_idx);
438
439         r = c->callback(rtnl, m, c->userdata);
440         if (r < 0)
441                 log_debug_errno(r, "sd-rtnl: callback failed: %m");
442
443         free(c);
444
445         return 1;
446 }
447
448 static int process_match(sd_rtnl *rtnl, sd_rtnl_message *m) {
449         struct match_callback *c;
450         uint16_t type;
451         int r;
452
453         assert(rtnl);
454         assert(m);
455
456         r = sd_rtnl_message_get_type(m, &type);
457         if (r < 0)
458                 return r;
459
460         LIST_FOREACH(match_callbacks, c, rtnl->match_callbacks) {
461                 if (type == c->type) {
462                         r = c->callback(rtnl, m, c->userdata);
463                         if (r != 0) {
464                                 if (r < 0)
465                                         log_debug_errno(r, "sd-rtnl: match callback failed: %m");
466
467                                 break;
468                         }
469                 }
470         }
471
472         return 1;
473 }
474
475 static int process_running(sd_rtnl *rtnl, sd_rtnl_message **ret) {
476         _cleanup_rtnl_message_unref_ sd_rtnl_message *m = NULL;
477         int r;
478
479         assert(rtnl);
480
481         r = process_timeout(rtnl);
482         if (r != 0)
483                 goto null_message;
484
485         r = dispatch_wqueue(rtnl);
486         if (r != 0)
487                 goto null_message;
488
489         r = dispatch_rqueue(rtnl, &m);
490         if (r < 0)
491                 return r;
492         if (!m)
493                 goto null_message;
494
495         if (sd_rtnl_message_is_broadcast(m)) {
496                 r = process_match(rtnl, m);
497                 if (r != 0)
498                         goto null_message;
499         } else {
500                 r = process_reply(rtnl, m);
501                 if (r != 0)
502                         goto null_message;
503         }
504
505         if (ret) {
506                 *ret = m;
507                 m = NULL;
508
509                 return 1;
510         }
511
512         return 1;
513
514 null_message:
515         if (r >= 0 && ret)
516                 *ret = NULL;
517
518         return r;
519 }
520
521 int sd_rtnl_process(sd_rtnl *rtnl, sd_rtnl_message **ret) {
522         RTNL_DONT_DESTROY(rtnl);
523         int r;
524
525         assert_return(rtnl, -EINVAL);
526         assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
527         assert_return(!rtnl->processing, -EBUSY);
528
529         rtnl->processing = true;
530         r = process_running(rtnl, ret);
531         rtnl->processing = false;
532
533         return r;
534 }
535
536 static usec_t calc_elapse(uint64_t usec) {
537         if (usec == (uint64_t) -1)
538                 return 0;
539
540         if (usec == 0)
541                 usec = RTNL_DEFAULT_TIMEOUT;
542
543         return now(CLOCK_MONOTONIC) + usec;
544 }
545
546 static int rtnl_poll(sd_rtnl *rtnl, bool need_more, uint64_t timeout_usec) {
547         struct pollfd p[1] = {};
548         struct timespec ts;
549         usec_t m = USEC_INFINITY;
550         int r, e;
551
552         assert(rtnl);
553
554         e = sd_rtnl_get_events(rtnl);
555         if (e < 0)
556                 return e;
557
558         if (need_more)
559                 /* Caller wants more data, and doesn't care about
560                  * what's been read or any other timeouts. */
561                 e |= POLLIN;
562         else {
563                 usec_t until;
564                 /* Caller wants to process if there is something to
565                  * process, but doesn't care otherwise */
566
567                 r = sd_rtnl_get_timeout(rtnl, &until);
568                 if (r < 0)
569                         return r;
570                 if (r > 0) {
571                         usec_t nw;
572                         nw = now(CLOCK_MONOTONIC);
573                         m = until > nw ? until - nw : 0;
574                 }
575         }
576
577         if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m))
578                 m = timeout_usec;
579
580         p[0].fd = rtnl->fd;
581         p[0].events = e;
582
583         r = ppoll(p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL);
584         if (r < 0)
585                 return -errno;
586
587         return r > 0 ? 1 : 0;
588 }
589
590 int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout_usec) {
591         assert_return(nl, -EINVAL);
592         assert_return(!rtnl_pid_changed(nl), -ECHILD);
593
594         if (nl->rqueue_size > 0)
595                 return 0;
596
597         return rtnl_poll(nl, false, timeout_usec);
598 }
599
600 static int timeout_compare(const void *a, const void *b) {
601         const struct reply_callback *x = a, *y = b;
602
603         if (x->timeout != 0 && y->timeout == 0)
604                 return -1;
605
606         if (x->timeout == 0 && y->timeout != 0)
607                 return 1;
608
609         if (x->timeout < y->timeout)
610                 return -1;
611
612         if (x->timeout > y->timeout)
613                 return 1;
614
615         return 0;
616 }
617
618 int sd_rtnl_call_async(sd_rtnl *nl,
619                        sd_rtnl_message *m,
620                        sd_rtnl_message_handler_t callback,
621                        void *userdata,
622                        uint64_t usec,
623                        uint32_t *serial) {
624         struct reply_callback *c;
625         uint32_t s;
626         int r, k;
627
628         assert_return(nl, -EINVAL);
629         assert_return(m, -EINVAL);
630         assert_return(callback, -EINVAL);
631         assert_return(!rtnl_pid_changed(nl), -ECHILD);
632
633         r = hashmap_ensure_allocated(&nl->reply_callbacks, &uint64_hash_ops);
634         if (r < 0)
635                 return r;
636
637         if (usec != (uint64_t) -1) {
638                 r = prioq_ensure_allocated(&nl->reply_callbacks_prioq, timeout_compare);
639                 if (r < 0)
640                         return r;
641         }
642
643         c = new0(struct reply_callback, 1);
644         if (!c)
645                 return -ENOMEM;
646
647         c->callback = callback;
648         c->userdata = userdata;
649         c->timeout = calc_elapse(usec);
650
651         k = sd_rtnl_send(nl, m, &s);
652         if (k < 0) {
653                 free(c);
654                 return k;
655         }
656
657         c->serial = s;
658
659         r = hashmap_put(nl->reply_callbacks, &c->serial, c);
660         if (r < 0) {
661                 free(c);
662                 return r;
663         }
664
665         if (c->timeout != 0) {
666                 r = prioq_put(nl->reply_callbacks_prioq, c, &c->prioq_idx);
667                 if (r > 0) {
668                         c->timeout = 0;
669                         sd_rtnl_call_async_cancel(nl, c->serial);
670                         return r;
671                 }
672         }
673
674         if (serial)
675                 *serial = s;
676
677         return k;
678 }
679
680 int sd_rtnl_call_async_cancel(sd_rtnl *nl, uint32_t serial) {
681         struct reply_callback *c;
682         uint64_t s = serial;
683
684         assert_return(nl, -EINVAL);
685         assert_return(serial != 0, -EINVAL);
686         assert_return(!rtnl_pid_changed(nl), -ECHILD);
687
688         c = hashmap_remove(nl->reply_callbacks, &s);
689         if (!c)
690                 return 0;
691
692         if (c->timeout != 0)
693                 prioq_remove(nl->reply_callbacks_prioq, c, &c->prioq_idx);
694
695         free(c);
696         return 1;
697 }
698
699 int sd_rtnl_call(sd_rtnl *rtnl,
700                 sd_rtnl_message *message,
701                 uint64_t usec,
702                 sd_rtnl_message **ret) {
703         usec_t timeout;
704         uint32_t serial;
705         unsigned i = 0;
706         int r;
707
708         assert_return(rtnl, -EINVAL);
709         assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
710         assert_return(message, -EINVAL);
711
712         r = sd_rtnl_send(rtnl, message, &serial);
713         if (r < 0)
714                 return r;
715
716         timeout = calc_elapse(usec);
717
718         for (;;) {
719                 usec_t left;
720
721                 while (i < rtnl->rqueue_size) {
722                         sd_rtnl_message *incoming;
723                         uint32_t received_serial;
724
725                         incoming = rtnl->rqueue[i];
726                         received_serial = rtnl_message_get_serial(incoming);
727
728                         if (received_serial == serial) {
729                                 /* found a match, remove from rqueue and return it */
730                                 memmove(rtnl->rqueue + i,rtnl->rqueue + i + 1,
731                                         sizeof(sd_rtnl_message*) * (rtnl->rqueue_size - i - 1));
732                                 rtnl->rqueue_size--;
733
734                                 r = sd_rtnl_message_get_errno(incoming);
735                                 if (r < 0) {
736                                         sd_rtnl_message_unref(incoming);
737                                         return r;
738                                 }
739
740                                 if (ret) {
741                                         *ret = incoming;
742                                 } else
743                                         sd_rtnl_message_unref(incoming);
744
745                                 return 1;
746                         }
747
748                         /* Try to read more, right away */
749                         i ++;
750                 }
751
752                 r = socket_read_message(rtnl);
753                 if (r < 0)
754                         return r;
755                 if (r > 0)
756                         /* received message, so try to process straight away */
757                         continue;
758
759                 if (timeout > 0) {
760                         usec_t n;
761
762                         n = now(CLOCK_MONOTONIC);
763                         if (n >= timeout)
764                                 return -ETIMEDOUT;
765
766                         left = timeout - n;
767                 } else
768                         left = (uint64_t) -1;
769
770                 r = rtnl_poll(rtnl, true, left);
771                 if (r < 0)
772                         return r;
773                 else if (r == 0)
774                         return -ETIMEDOUT;
775
776                 r = dispatch_wqueue(rtnl);
777                 if (r < 0)
778                         return r;
779         }
780 }
781
782 int sd_rtnl_flush(sd_rtnl *rtnl) {
783         int r;
784
785         assert_return(rtnl, -EINVAL);
786         assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
787
788         if (rtnl->wqueue_size <= 0)
789                 return 0;
790
791         for (;;) {
792                 r = dispatch_wqueue(rtnl);
793                 if (r < 0)
794                         return r;
795
796                 if (rtnl->wqueue_size <= 0)
797                         return 0;
798
799                 r = rtnl_poll(rtnl, false, (uint64_t) -1);
800                 if (r < 0)
801                         return r;
802         }
803 }
804
805 int sd_rtnl_get_events(sd_rtnl *rtnl) {
806         int flags = 0;
807
808         assert_return(rtnl, -EINVAL);
809         assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
810
811         if (rtnl->rqueue_size <= 0)
812                 flags |= POLLIN;
813         if (rtnl->wqueue_size > 0)
814                 flags |= POLLOUT;
815
816         return flags;
817 }
818
819 int sd_rtnl_get_timeout(sd_rtnl *rtnl, uint64_t *timeout_usec) {
820         struct reply_callback *c;
821
822         assert_return(rtnl, -EINVAL);
823         assert_return(timeout_usec, -EINVAL);
824         assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
825
826         if (rtnl->rqueue_size > 0) {
827                 *timeout_usec = 0;
828                 return 1;
829         }
830
831         c = prioq_peek(rtnl->reply_callbacks_prioq);
832         if (!c) {
833                 *timeout_usec = (uint64_t) -1;
834                 return 0;
835         }
836
837         *timeout_usec = c->timeout;
838
839         return 1;
840 }
841
842 static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
843         sd_rtnl *rtnl = userdata;
844         int r;
845
846         assert(rtnl);
847
848         r = sd_rtnl_process(rtnl, NULL);
849         if (r < 0)
850                 return r;
851
852         return 1;
853 }
854
855 static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) {
856         sd_rtnl *rtnl = userdata;
857         int r;
858
859         assert(rtnl);
860
861         r = sd_rtnl_process(rtnl, NULL);
862         if (r < 0)
863                 return r;
864
865         return 1;
866 }
867
868 static int prepare_callback(sd_event_source *s, void *userdata) {
869         sd_rtnl *rtnl = userdata;
870         int r, e;
871         usec_t until;
872
873         assert(s);
874         assert(rtnl);
875
876         e = sd_rtnl_get_events(rtnl);
877         if (e < 0)
878                 return e;
879
880         r = sd_event_source_set_io_events(rtnl->io_event_source, e);
881         if (r < 0)
882                 return r;
883
884         r = sd_rtnl_get_timeout(rtnl, &until);
885         if (r < 0)
886                 return r;
887         if (r > 0) {
888                 int j;
889
890                 j = sd_event_source_set_time(rtnl->time_event_source, until);
891                 if (j < 0)
892                         return j;
893         }
894
895         r = sd_event_source_set_enabled(rtnl->time_event_source, r > 0);
896         if (r < 0)
897                 return r;
898
899         return 1;
900 }
901
902 static int exit_callback(sd_event_source *event, void *userdata) {
903         sd_rtnl *rtnl = userdata;
904
905         assert(event);
906
907         sd_rtnl_flush(rtnl);
908
909         return 1;
910 }
911
912 int sd_rtnl_attach_event(sd_rtnl *rtnl, sd_event *event, int priority) {
913         int r;
914
915         assert_return(rtnl, -EINVAL);
916         assert_return(!rtnl->event, -EBUSY);
917
918         assert(!rtnl->io_event_source);
919         assert(!rtnl->time_event_source);
920
921         if (event)
922                 rtnl->event = sd_event_ref(event);
923         else {
924                 r = sd_event_default(&rtnl->event);
925                 if (r < 0)
926                         return r;
927         }
928
929         r = sd_event_add_io(rtnl->event, &rtnl->io_event_source, rtnl->fd, 0, io_callback, rtnl);
930         if (r < 0)
931                 goto fail;
932
933         r = sd_event_source_set_priority(rtnl->io_event_source, priority);
934         if (r < 0)
935                 goto fail;
936
937         r = sd_event_source_set_description(rtnl->io_event_source, "rtnl-receive-message");
938         if (r < 0)
939                 goto fail;
940
941         r = sd_event_source_set_prepare(rtnl->io_event_source, prepare_callback);
942         if (r < 0)
943                 goto fail;
944
945         r = sd_event_add_time(rtnl->event, &rtnl->time_event_source, CLOCK_MONOTONIC, 0, 0, time_callback, rtnl);
946         if (r < 0)
947                 goto fail;
948
949         r = sd_event_source_set_priority(rtnl->time_event_source, priority);
950         if (r < 0)
951                 goto fail;
952
953         r = sd_event_source_set_description(rtnl->time_event_source, "rtnl-timer");
954         if (r < 0)
955                 goto fail;
956
957         r = sd_event_add_exit(rtnl->event, &rtnl->exit_event_source, exit_callback, rtnl);
958         if (r < 0)
959                 goto fail;
960
961         r = sd_event_source_set_description(rtnl->exit_event_source, "rtnl-exit");
962         if (r < 0)
963                 goto fail;
964
965         return 0;
966
967 fail:
968         sd_rtnl_detach_event(rtnl);
969         return r;
970 }
971
972 int sd_rtnl_detach_event(sd_rtnl *rtnl) {
973         assert_return(rtnl, -EINVAL);
974         assert_return(rtnl->event, -ENXIO);
975
976         if (rtnl->io_event_source)
977                 rtnl->io_event_source = sd_event_source_unref(rtnl->io_event_source);
978
979         if (rtnl->time_event_source)
980                 rtnl->time_event_source = sd_event_source_unref(rtnl->time_event_source);
981
982         if (rtnl->exit_event_source)
983                 rtnl->exit_event_source = sd_event_source_unref(rtnl->exit_event_source);
984
985         if (rtnl->event)
986                 rtnl->event = sd_event_unref(rtnl->event);
987
988         return 0;
989 }
990
991 int sd_rtnl_add_match(sd_rtnl *rtnl,
992                       uint16_t type,
993                       sd_rtnl_message_handler_t callback,
994                       void *userdata) {
995         struct match_callback *c;
996
997         assert_return(rtnl, -EINVAL);
998         assert_return(callback, -EINVAL);
999         assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
1000         assert_return(rtnl_message_type_is_link(type) ||
1001                       rtnl_message_type_is_addr(type) ||
1002                       rtnl_message_type_is_route(type), -EOPNOTSUPP);
1003
1004         c = new0(struct match_callback, 1);
1005         if (!c)
1006                 return -ENOMEM;
1007
1008         c->callback = callback;
1009         c->type = type;
1010         c->userdata = userdata;
1011
1012         LIST_PREPEND(match_callbacks, rtnl->match_callbacks, c);
1013
1014         return 0;
1015 }
1016
1017 int sd_rtnl_remove_match(sd_rtnl *rtnl,
1018                          uint16_t type,
1019                          sd_rtnl_message_handler_t callback,
1020                          void *userdata) {
1021         struct match_callback *c;
1022
1023         assert_return(rtnl, -EINVAL);
1024         assert_return(callback, -EINVAL);
1025         assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
1026
1027         LIST_FOREACH(match_callbacks, c, rtnl->match_callbacks)
1028                 if (c->callback == callback && c->type == type && c->userdata == userdata) {
1029                         LIST_REMOVE(match_callbacks, rtnl->match_callbacks, c);
1030                         free(c);
1031
1032                         return 1;
1033                 }
1034
1035         return 0;
1036 }