X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?p=elogind.git;a=blobdiff_plain;f=src%2Flibsystemd-rtnl%2Fsd-rtnl.c;h=08b82ab2a94f3f6dca6bb5e21934d90cbb48cfcc;hp=8ea11df808d2981dda7c59cedd141e3ed542b035;hb=eca7a27add5aa3ec85c272fc4cef287fdca034ce;hpb=a33dece5f8cce7f1946263bec76068ef84abc07b diff --git a/src/libsystemd-rtnl/sd-rtnl.c b/src/libsystemd-rtnl/sd-rtnl.c index 8ea11df80..08b82ab2a 100644 --- a/src/libsystemd-rtnl/sd-rtnl.c +++ b/src/libsystemd-rtnl/sd-rtnl.c @@ -24,6 +24,7 @@ #include "macro.h" #include "util.h" +#include "hashmap.h" #include "sd-rtnl.h" #include "rtnl-internal.h" @@ -46,6 +47,16 @@ static int sd_rtnl_new(sd_rtnl **ret) { rtnl->original_pid = getpid(); + LIST_HEAD_INIT(rtnl->match_callbacks); + + /* We guarantee that wqueue always has space for at least + * one entry */ + rtnl->wqueue = new(sd_rtnl_message*, 1); + if (!rtnl->wqueue) { + free(rtnl); + return -ENOMEM; + } + *ret = rtnl; return 0; } @@ -64,6 +75,8 @@ int sd_rtnl_open(uint32_t groups, sd_rtnl **ret) { socklen_t addrlen; int r; + assert_return(ret, -EINVAL); + r = sd_rtnl_new(&rtnl); if (r < 0) return r; @@ -98,23 +111,40 @@ sd_rtnl *sd_rtnl_ref(sd_rtnl *rtnl) { } sd_rtnl *sd_rtnl_unref(sd_rtnl *rtnl) { + if (rtnl && REFCNT_DEC(rtnl->n_ref) <= 0) { + struct match_callback *f; + unsigned i; + + for (i = 0; i < rtnl->rqueue_size; i++) + sd_rtnl_message_unref(rtnl->rqueue[i]); + free(rtnl->rqueue); + + for (i = 0; i < rtnl->wqueue_size; i++) + sd_rtnl_message_unref(rtnl->wqueue[i]); + free(rtnl->wqueue); + + hashmap_free_free(rtnl->reply_callbacks); + prioq_free(rtnl->reply_callbacks_prioq); + + while ((f = rtnl->match_callbacks)) { + LIST_REMOVE(match_callbacks, rtnl->match_callbacks, f); + free(f); + } + if (rtnl->fd >= 0) close_nointr_nofail(rtnl->fd); + free(rtnl); } return NULL; } -int sd_rtnl_send_with_reply_and_block(sd_rtnl *nl, - sd_rtnl_message *message, - uint64_t usec, - sd_rtnl_message **ret) { - struct pollfd p[1] = {}; - struct timespec left; - usec_t timeout; - int r, serial; +int sd_rtnl_send(sd_rtnl *nl, + sd_rtnl_message *message, + uint32_t *serial) { + int r; assert_return(nl, -EINVAL); assert_return(!rtnl_pid_changed(nl), -ECHILD); @@ -124,82 +154,717 @@ int sd_rtnl_send_with_reply_and_block(sd_rtnl *nl, if (r < 0) return r; - serial = message_get_serial(message); + if (nl->wqueue_size <= 0) { + /* send directly */ + r = socket_write_message(nl, message); + if (r < 0) + return r; + else if (r == 0) { + /* nothing was sent, so let's put it on + * the queue */ + nl->wqueue[0] = sd_rtnl_message_ref(message); + nl->wqueue_size = 1; + } + } else { + sd_rtnl_message **q; - p[0].fd = nl->fd; - p[0].events = POLLOUT; + /* append to queue */ + if (nl->wqueue_size >= RTNL_WQUEUE_MAX) + return -ENOBUFS; - if (usec == (uint64_t) -1) - timeout = 0; - else if (usec == 0) - timeout = now(CLOCK_MONOTONIC) + RTNL_DEFAULT_TIMEOUT; - else - timeout = now(CLOCK_MONOTONIC) + usec; + q = realloc(nl->wqueue, sizeof(sd_rtnl_message*) * (nl->wqueue_size + 1)); + if (!q) + return -ENOMEM; - for (;;) { - if (timeout) { - usec_t n; + nl->wqueue = q; + q[nl->wqueue_size ++] = sd_rtnl_message_ref(message); + } - n = now(CLOCK_MONOTONIC); - if (n >= timeout) - return -ETIMEDOUT; + if (serial) + *serial = message_get_serial(message); - timespec_store(&left, timeout - n); - } + return 1; +} - r = ppoll(p, 1, timeout ? &left : NULL, NULL); - if (r < 0) - return 0; +static int dispatch_rqueue(sd_rtnl *rtnl, sd_rtnl_message **message) { + sd_rtnl_message *z = NULL; + int r; - r = socket_write_message(nl, message); + assert(rtnl); + assert(message); + + if (rtnl->rqueue_size > 0) { + /* Dispatch a queued message */ + + *message = rtnl->rqueue[0]; + rtnl->rqueue_size --; + memmove(rtnl->rqueue, rtnl->rqueue + 1, sizeof(sd_rtnl_message*) * rtnl->rqueue_size); + + return 1; + } + + /* Try to read a new message */ + r = socket_read_message(rtnl, &z); + if (r < 0) + return r; + if (r == 0) + return 0; + + *message = z; + + return 1; +} + +static int dispatch_wqueue(sd_rtnl *rtnl) { + int r, ret = 0; + + assert(rtnl); + + while (rtnl->wqueue_size > 0) { + r = socket_write_message(rtnl, rtnl->wqueue[0]); if (r < 0) return r; - - if (r > 0) { - break; + else if (r == 0) + /* Didn't do anything this time */ + return ret; + else { + /* see equivalent in sd-bus.c */ + sd_rtnl_message_unref(rtnl->wqueue[0]); + rtnl->wqueue_size --; + memmove(rtnl->wqueue, rtnl->wqueue + 1, sizeof(sd_rtnl_message*) * rtnl->wqueue_size); + + ret = 1; } } - p[0].events = POLLIN; + return ret; +} - for (;;) { - _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *reply = NULL; +static int process_timeout(sd_rtnl *rtnl) { + _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL; + struct reply_callback *c; + usec_t n; + int r; - if (timeout) { - usec_t n; + assert(rtnl); - n = now(CLOCK_MONOTONIC); - if (n >= timeout) - return -ETIMEDOUT; + c = prioq_peek(rtnl->reply_callbacks_prioq); + if (!c) + return 0; + + n = now(CLOCK_MONOTONIC); + if (c->timeout > n) + return 0; - timespec_store(&left, timeout - n); + r = message_new_synthetic_error(-ETIMEDOUT, c->serial, &m); + if (r < 0) + return r; + + assert_se(prioq_pop(rtnl->reply_callbacks_prioq) == c); + hashmap_remove(rtnl->reply_callbacks, &c->serial); + + r = c->callback(rtnl, m, c->userdata); + free(c); + + return r < 0 ? r : 1; +} + +static int process_reply(sd_rtnl *rtnl, sd_rtnl_message *m) { + struct reply_callback *c; + uint64_t serial; + int r; + + assert(rtnl); + assert(m); + + serial = message_get_serial(m); + c = hashmap_remove(rtnl->reply_callbacks, &serial); + if (!c) + return 0; + + if (c->timeout != 0) + prioq_remove(rtnl->reply_callbacks_prioq, c, &c->prioq_idx); + + r = c->callback(rtnl, m, c->userdata); + free(c); + + return r; +} + +static int process_match(sd_rtnl *rtnl, sd_rtnl_message *m) { + struct match_callback *c; + uint16_t type; + int r; + + assert(rtnl); + assert(m); + + r = sd_rtnl_message_get_type(m, &type); + if (r < 0) + return r; + + LIST_FOREACH(match_callbacks, c, rtnl->match_callbacks) { + if (type == c->type) { + r = c->callback(rtnl, m, c->userdata); + if (r != 0) + return r; } + } + + return 0; +} + +static int process_running(sd_rtnl *rtnl, sd_rtnl_message **ret) { + _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL; + int r; + + assert(rtnl); + + r = process_timeout(rtnl); + if (r != 0) + goto null_message; + + r = dispatch_wqueue(rtnl); + if (r != 0) + goto null_message; + + r = dispatch_rqueue(rtnl, &m); + if (r < 0) + return r; + if (!m) + goto null_message; + + r = process_reply(rtnl, m); + if (r != 0) + goto null_message; + + r = process_match(rtnl, m); + if (r != 0) + goto null_message; + + if (ret) { + *ret = m; + m = NULL; + + return 1; + } + + return 1; - r = ppoll(p, 1, timeout ? &left : NULL, NULL); +null_message: + if (r >= 0 && ret) + *ret = NULL; + + return r; +} + +int sd_rtnl_process(sd_rtnl *rtnl, sd_rtnl_message **ret) { + RTNL_DONT_DESTROY(rtnl); + int r; + + assert_return(rtnl, -EINVAL); + assert_return(!rtnl_pid_changed(rtnl), -ECHILD); + assert_return(!rtnl->processing, -EBUSY); + + rtnl->processing = true; + r = process_running(rtnl, ret); + rtnl->processing = false; + + return r; +} + +static usec_t calc_elapse(uint64_t usec) { + if (usec == (uint64_t) -1) + return 0; + + if (usec == 0) + usec = RTNL_DEFAULT_TIMEOUT; + + return now(CLOCK_MONOTONIC) + usec; +} + +static int rtnl_poll(sd_rtnl *rtnl, bool need_more, uint64_t timeout_usec) { + struct pollfd p[1] = {}; + struct timespec ts; + usec_t m = (usec_t) -1; + int r, e; + + assert(rtnl); + + e = sd_rtnl_get_events(rtnl); + if (e < 0) + return e; + + if (need_more) + /* Caller wants more data, and doesn't care about + * what's been read or any other timeouts. */ + return e |= POLLIN; + else { + usec_t until; + /* Caller wants to process if there is something to + * process, but doesn't care otherwise */ + + r = sd_rtnl_get_timeout(rtnl, &until); if (r < 0) return r; + if (r > 0) { + usec_t nw; + nw = now(CLOCK_MONOTONIC); + m = until > nw ? until - nw : 0; + } + } + + if (timeout_usec != (uint64_t) -1 && (m == (uint64_t) -1 || timeout_usec < m)) + m = timeout_usec; + + p[0].fd = rtnl->fd; + p[0].events = e; + + r = ppoll(p, 1, m == (uint64_t) -1 ? NULL : timespec_store(&ts, m), NULL); + if (r < 0) + return -errno; + + return r > 0 ? 1 : 0; +} + +int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout_usec) { + assert_return(nl, -EINVAL); + assert_return(!rtnl_pid_changed(nl), -ECHILD); + + if (nl->rqueue_size > 0) + return 0; + + return rtnl_poll(nl, false, timeout_usec); +} + +static int timeout_compare(const void *a, const void *b) { + const struct reply_callback *x = a, *y = b; + + if (x->timeout != 0 && y->timeout == 0) + return -1; + + if (x->timeout == 0 && y->timeout != 0) + return 1; + + if (x->timeout < y->timeout) + return -1; + + if (x->timeout > y->timeout) + return 1; + + return 0; +} + +int sd_rtnl_call_async(sd_rtnl *nl, + sd_rtnl_message *m, + sd_rtnl_message_handler_t callback, + void *userdata, + uint64_t usec, + uint32_t *serial) { + struct reply_callback *c; + uint32_t s; + int r, k; + + assert_return(nl, -EINVAL); + assert_return(m, -EINVAL); + assert_return(callback, -EINVAL); + assert_return(!rtnl_pid_changed(nl), -ECHILD); - r = socket_read_message(nl, &reply); + r = hashmap_ensure_allocated(&nl->reply_callbacks, uint64_hash_func, uint64_compare_func); + if (r < 0) + return r; + + if (usec != (uint64_t) -1) { + r = prioq_ensure_allocated(&nl->reply_callbacks_prioq, timeout_compare); if (r < 0) return r; + } + c = new0(struct reply_callback, 1); + if (!c) + return -ENOMEM; + + c->callback = callback; + c->userdata = userdata; + c->timeout = calc_elapse(usec); + + k = sd_rtnl_send(nl, m, &s); + if (k < 0) { + free(c); + return k; + } + + c->serial = s; + + r = hashmap_put(nl->reply_callbacks, &c->serial, c); + if (r < 0) { + free(c); + return r; + } + + if (c->timeout != 0) { + r = prioq_put(nl->reply_callbacks_prioq, c, &c->prioq_idx); if (r > 0) { - int received_serial = message_get_serial(reply); + c->timeout = 0; + sd_rtnl_call_async_cancel(nl, c->serial); + return r; + } + } + + if (serial) + *serial = s; + + return k; +} + +int sd_rtnl_call_async_cancel(sd_rtnl *nl, uint32_t serial) { + struct reply_callback *c; + uint64_t s = serial; + + assert_return(nl, -EINVAL); + assert_return(serial != 0, -EINVAL); + assert_return(!rtnl_pid_changed(nl), -ECHILD); + + c = hashmap_remove(nl->reply_callbacks, &s); + if (!c) + return 0; + + if (c->timeout != 0) + prioq_remove(nl->reply_callbacks_prioq, c, &c->prioq_idx); + + free(c); + return 1; +} + +int sd_rtnl_call(sd_rtnl *nl, + sd_rtnl_message *message, + uint64_t usec, + sd_rtnl_message **ret) { + usec_t timeout; + uint32_t serial; + bool room = false; + int r; + + assert_return(nl, -EINVAL); + assert_return(!rtnl_pid_changed(nl), -ECHILD); + assert_return(message, -EINVAL); + + r = sd_rtnl_send(nl, message, &serial); + if (r < 0) + return r; + + timeout = calc_elapse(usec); + + for (;;) { + usec_t left; + _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *incoming = NULL; + + if (!room) { + sd_rtnl_message **q; + + if (nl->rqueue_size >= RTNL_RQUEUE_MAX) + return -ENOBUFS; + + /* Make sure there's room for queueing this + * locally, before we read the message */ + + q = realloc(nl->rqueue, (nl->rqueue_size + 1) * sizeof(sd_rtnl_message*)); + if (!q) + return -ENOMEM; + + nl->rqueue = q; + room = true; + } + + r = socket_read_message(nl, &incoming); + if (r < 0) + return r; + if (incoming) { + uint32_t received_serial = message_get_serial(incoming); if (received_serial == serial) { - r = message_get_errno(reply); + r = sd_rtnl_message_get_errno(incoming); if (r < 0) return r; if (ret) { - *ret = reply; - reply = NULL; + *ret = incoming; + incoming = NULL; } - break;; + return 1; } + + /* Room was allocated on the queue above */ + nl->rqueue[nl->rqueue_size ++] = incoming; + incoming = NULL; + room = false; + + /* Try to read more, right away */ + continue; } + if (r != 0) + continue; + + if (timeout > 0) { + usec_t n; + + n = now(CLOCK_MONOTONIC); + if (n >= timeout) + return -ETIMEDOUT; + + left = timeout - n; + } else + left = (uint64_t) -1; + + r = rtnl_poll(nl, true, left); + if (r < 0) + return r; + + r = dispatch_wqueue(nl); + if (r < 0) + return r; + } +} + +int sd_rtnl_flush(sd_rtnl *rtnl) { + int r; + + assert_return(rtnl, -EINVAL); + assert_return(!rtnl_pid_changed(rtnl), -ECHILD); + + if (rtnl->wqueue_size <= 0) + return 0; + + for (;;) { + r = dispatch_wqueue(rtnl); + if (r < 0) + return r; + + if (rtnl->wqueue_size <= 0) + return 0; + + r = rtnl_poll(rtnl, false, (uint64_t) -1); + if (r < 0) + return r; + } +} + +int sd_rtnl_get_events(sd_rtnl *rtnl) { + int flags = 0; + + assert_return(rtnl, -EINVAL); + assert_return(!rtnl_pid_changed(rtnl), -ECHILD); + + if (rtnl->rqueue_size <= 0) + flags |= POLLIN; + if (rtnl->wqueue_size > 0) + flags |= POLLOUT; + + return flags; +} + +int sd_rtnl_get_timeout(sd_rtnl *rtnl, uint64_t *timeout_usec) { + struct reply_callback *c; + + assert_return(rtnl, -EINVAL); + assert_return(timeout_usec, -EINVAL); + assert_return(!rtnl_pid_changed(rtnl), -ECHILD); + + if (rtnl->rqueue_size > 0) { + *timeout_usec = 0; + return 1; + } + + c = prioq_peek(rtnl->reply_callbacks_prioq); + if (!c) { + *timeout_usec = (uint64_t) -1; + return 0; + } + + *timeout_usec = c->timeout; + + return 1; +} + +static int io_callback(sd_event_source *s, int fd, uint32_t revents, void *userdata) { + sd_rtnl *rtnl = userdata; + int r; + + assert(rtnl); + + r = sd_rtnl_process(rtnl, NULL); + if (r < 0) + return r; + + return 1; +} + +static int time_callback(sd_event_source *s, uint64_t usec, void *userdata) { + sd_rtnl *rtnl = userdata; + int r; + + assert(rtnl); + + r = sd_rtnl_process(rtnl, NULL); + if (r < 0) + return r; + + return 1; +} + +static int prepare_callback(sd_event_source *s, void *userdata) { + sd_rtnl *rtnl = userdata; + int r, e; + usec_t until; + + assert(s); + assert(rtnl); + + e = sd_rtnl_get_events(rtnl); + if (e < 0) + return e; + + r = sd_event_source_set_io_events(rtnl->io_event_source, e); + if (r < 0) + return r; + + r = sd_rtnl_get_timeout(rtnl, &until); + if (r < 0) + return r; + if (r > 0) { + int j; + + j = sd_event_source_set_time(rtnl->time_event_source, until); + if (j < 0) + return j; + } + + r = sd_event_source_set_enabled(rtnl->time_event_source, r > 0); + if (r < 0) + return r; + + return 1; +} + +static int exit_callback(sd_event_source *event, void *userdata) { + sd_rtnl *rtnl = userdata; + + assert(event); + + sd_rtnl_flush(rtnl); + + return 1; +} + +int sd_rtnl_attach_event(sd_rtnl *rtnl, sd_event *event, int priority) { + int r; + + assert_return(rtnl, -EINVAL); + assert_return(!rtnl->event, -EBUSY); + + assert(!rtnl->io_event_source); + assert(!rtnl->time_event_source); + + if (event) + rtnl->event = sd_event_ref(event); + else { + r = sd_event_default(&rtnl->event); + if (r < 0) + return r; } + r = sd_event_add_io(rtnl->event, rtnl->fd, 0, io_callback, rtnl, &rtnl->io_event_source); + if (r < 0) + goto fail; + + r = sd_event_source_set_priority(rtnl->io_event_source, priority); + if (r < 0) + goto fail; + + r = sd_event_source_set_prepare(rtnl->io_event_source, prepare_callback); + if (r < 0) + goto fail; + + r = sd_event_add_monotonic(rtnl->event, 0, 0, time_callback, rtnl, &rtnl->time_event_source); + if (r < 0) + goto fail; + + r = sd_event_source_set_priority(rtnl->time_event_source, priority); + if (r < 0) + goto fail; + + r = sd_event_add_exit(rtnl->event, exit_callback, rtnl, &rtnl->exit_event_source); + if (r < 0) + goto fail; + + return 0; + +fail: + sd_rtnl_detach_event(rtnl); + return r; +} + +int sd_rtnl_detach_event(sd_rtnl *rtnl) { + assert_return(rtnl, -EINVAL); + assert_return(rtnl->event, -ENXIO); + + if (rtnl->io_event_source) + rtnl->io_event_source = sd_event_source_unref(rtnl->io_event_source); + + if (rtnl->time_event_source) + rtnl->time_event_source = sd_event_source_unref(rtnl->time_event_source); + + if (rtnl->exit_event_source) + rtnl->exit_event_source = sd_event_source_unref(rtnl->exit_event_source); + + if (rtnl->event) + rtnl->event = sd_event_unref(rtnl->event); + + return 0; +} + +int sd_rtnl_add_match(sd_rtnl *rtnl, + uint16_t type, + sd_rtnl_message_handler_t callback, + void *userdata) { + struct match_callback *c; + + assert_return(rtnl, -EINVAL); + assert_return(callback, -EINVAL); + assert_return(!rtnl_pid_changed(rtnl), -ECHILD); + assert_return(message_type_is_link(type) || message_type_is_addr(type) || message_type_is_route(type), -ENOTSUP); + + c = new0(struct match_callback, 1); + if (!c) + return -ENOMEM; + + c->callback = callback; + c->type = type; + c->userdata = userdata; + + LIST_PREPEND(match_callbacks, rtnl->match_callbacks, c); + + return 0; +} + +int sd_rtnl_remove_match(sd_rtnl *rtnl, + uint16_t type, + sd_rtnl_message_handler_t callback, + void *userdata) { + struct match_callback *c; + + assert_return(rtnl, -EINVAL); + assert_return(callback, -EINVAL); + assert_return(!rtnl_pid_changed(rtnl), -ECHILD); + + LIST_FOREACH(match_callbacks, c, rtnl->match_callbacks) + if (c->callback == callback && c->type == type && c->userdata == userdata) { + LIST_REMOVE(match_callbacks, rtnl->match_callbacks, c); + free(c); + + return 1; + } + return 0; }