chiark / gitweb /
rtnl: start adding support for asynchronous messaging
authorTom Gundersen <teg@jklm.no>
Tue, 12 Nov 2013 21:37:51 +0000 (22:37 +0100)
committerTom Gundersen <teg@jklm.no>
Wed, 13 Nov 2013 18:52:17 +0000 (19:52 +0100)
Similarly to sd-bus, add:

sd_rtnl_wait
sd_rtnl_process
sd_rtnl_send

and adapt sd_rtnl_call accordingly.

src/libsystemd-rtnl/rtnl-internal.h
src/libsystemd-rtnl/rtnl-message.c
src/libsystemd-rtnl/sd-rtnl.c
src/libsystemd-rtnl/test-rtnl.c
src/systemd/sd-rtnl.h

index b05290f..03297bb 100644 (file)
@@ -35,6 +35,14 @@ struct sd_rtnl {
                 struct sockaddr_nl nl;
         } sockaddr;
 
+        sd_rtnl_message **rqueue;
+        unsigned rqueue_size;
+
+        sd_rtnl_message **wqueue;
+        unsigned wqueue_size;
+
+        bool processing:1;
+
         uint32_t serial;
 
         pid_t original_pid;
@@ -42,8 +50,11 @@ struct sd_rtnl {
 
 #define RTNL_DEFAULT_TIMEOUT ((usec_t) (10 * USEC_PER_SEC))
 
+#define RTNL_WQUEUE_MAX 1024
+#define RTNL_RQUEUE_MAX 64*1024
+
 int message_get_errno(sd_rtnl_message *m);
-int message_get_serial(sd_rtnl_message *m);
+uint32_t message_get_serial(sd_rtnl_message *m);
 int message_seal(sd_rtnl *nl, sd_rtnl_message *m);
 int socket_write_message(sd_rtnl *nl, sd_rtnl_message *m);
 int socket_read_message(sd_rtnl *nl, sd_rtnl_message **ret);
index 941bd96..f7ff0a0 100644 (file)
@@ -367,7 +367,7 @@ int sd_rtnl_message_read(sd_rtnl_message *m, unsigned short *type, void **data)
         return message_read(m, type, data);
 }
 
-int message_get_serial(sd_rtnl_message *m) {
+uint32_t message_get_serial(sd_rtnl_message *m) {
         assert(m);
 
         return m->hdr->nlmsg_seq;
index eb3b01b..b375576 100644 (file)
@@ -46,6 +46,14 @@ static int sd_rtnl_new(sd_rtnl **ret) {
 
         rtnl->original_pid = getpid();
 
+        /* We guarantee that wqueue always has space for at least
+         * one entry */
+        rtnl->wqueue = new(sd_rtnl_message*, 1);
+        if (!rtnl->wqueue) {
+                free(rtnl);
+                return -ENOMEM;
+        }
+
         *ret = rtnl;
         return 0;
 }
@@ -98,23 +106,31 @@ sd_rtnl *sd_rtnl_ref(sd_rtnl *rtnl) {
 }
 
 sd_rtnl *sd_rtnl_unref(sd_rtnl *rtnl) {
+
         if (rtnl && REFCNT_DEC(rtnl->n_ref) <= 0) {
+                unsigned i;
+
+                for (i = 0; i < rtnl->rqueue_size; i++)
+                        sd_rtnl_message_unref(rtnl->rqueue[i]);
+                free(rtnl->rqueue);
+
+                for (i = 0; i < rtnl->wqueue_size; i++)
+                        sd_rtnl_message_unref(rtnl->wqueue[i]);
+                free(rtnl->wqueue);
+
                 if (rtnl->fd >= 0)
                         close_nointr_nofail(rtnl->fd);
+
                 free(rtnl);
         }
 
         return NULL;
 }
 
-int sd_rtnl_call(sd_rtnl *nl,
-                sd_rtnl_message *message,
-                uint64_t usec,
-                sd_rtnl_message **ret) {
-        struct pollfd p[1] = {};
-        struct timespec left;
-        usec_t timeout;
-        int r, serial;
+int sd_rtnl_send(sd_rtnl *nl,
+                 sd_rtnl_message *message,
+                 uint32_t *serial) {
+        int r;
 
         assert_return(nl, -EINVAL);
         assert_return(!rtnl_pid_changed(nl), -ECHILD);
@@ -124,82 +140,260 @@ int sd_rtnl_call(sd_rtnl *nl,
         if (r < 0)
                 return r;
 
-        serial = message_get_serial(message);
+        if (nl->wqueue_size <= 0) {
+                /* send directly */
+                r = socket_write_message(nl, message);
+                if (r < 0)
+                        return r;
+                else if (r == 0) {
+                        /* nothing was sent, so let's put it on
+                         * the queue */
+                        nl->wqueue[0] = sd_rtnl_message_ref(message);
+                        nl->wqueue_size = 1;
+                }
+        } else {
+                sd_rtnl_message **q;
 
-        p[0].fd = nl->fd;
-        p[0].events = POLLOUT;
+                /* append to queue */
+                if (nl->wqueue_size >= RTNL_WQUEUE_MAX)
+                        return -ENOBUFS;
 
-        if (usec == (uint64_t) -1)
-                timeout = 0;
-        else if (usec == 0)
-                timeout = now(CLOCK_MONOTONIC) + RTNL_DEFAULT_TIMEOUT;
-        else
-                timeout = now(CLOCK_MONOTONIC) + usec;
+                q = realloc(nl->wqueue, sizeof(sd_rtnl_message*) * (nl->wqueue_size + 1));
+                if (!q)
+                        return -ENOMEM;
 
-        for (;;) {
-                if (timeout) {
-                        usec_t n;
+                nl->wqueue = q;
+                q[nl->wqueue_size ++] = sd_rtnl_message_ref(message);
+        }
 
-                        n = now(CLOCK_MONOTONIC);
-                        if (n >= timeout)
-                                return -ETIMEDOUT;
+        if (serial)
+                *serial = message_get_serial(message);
 
-                        timespec_store(&left, timeout - n);
-                }
+        return 1;
+}
 
-                r = ppoll(p, 1, timeout ? &left : NULL, NULL);
-                if (r < 0)
-                        return 0;
+static int dispatch_rqueue(sd_rtnl *rtnl, sd_rtnl_message **message) {
+        sd_rtnl_message *z = NULL;
+        int r;
 
-                r = socket_write_message(nl, message);
+        assert(rtnl);
+        assert(message);
+
+        if (rtnl->rqueue_size > 0) {
+                /* Dispatch a queued message */
+
+                *message = rtnl->rqueue[0];
+                rtnl->rqueue_size --;
+                memmove(rtnl->rqueue, rtnl->rqueue + 1, sizeof(sd_rtnl_message*) * rtnl->rqueue_size);
+
+                return 1;
+        }
+
+        /* Try to read a new message */
+        r = socket_read_message(rtnl, &z);
+        if (r < 0)
+                return r;
+        if (r == 0)
+                return 0;
+
+        *message = z;
+
+        return 1;
+}
+
+static int dispatch_wqueue(sd_rtnl *rtnl) {
+        int r, ret = 0;
+
+        assert(rtnl);
+
+        while (rtnl->wqueue_size > 0) {
+                r = socket_write_message(rtnl, rtnl->wqueue[0]);
                 if (r < 0)
                         return r;
-
-                if (r > 0) {
-                        break;
+                else if (r == 0)
+                        /* Didn't do anything this time */
+                        return ret;
+                else {
+                        /* see equivalent in sd-bus.c */
+                        sd_rtnl_message_unref(rtnl->wqueue[0]);
+                        rtnl->wqueue_size --;
+                        memmove(rtnl->wqueue, rtnl->wqueue + 1, sizeof(sd_rtnl_message*) * rtnl->wqueue_size);
+
+                        ret = 1;
                 }
         }
 
+        return ret;
+}
+
+static int process_running(sd_rtnl *rtnl, sd_rtnl_message **ret) {
+        _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL;
+        int r;
+
+        r = dispatch_wqueue(rtnl);
+        if (r != 0)
+                goto null_message;
+
+        r = dispatch_rqueue(rtnl, &m);
+        if (r < 0)
+                return r;
+        if (!m)
+                goto null_message;
+
+        if (ret) {
+                *ret = m;
+                m = NULL;
+
+                return 1;
+        }
+
+        return 1;
+
+null_message:
+        if (r >= 0 && ret)
+                *ret = NULL;
+
+        return r;
+}
+int sd_rtnl_process(sd_rtnl *rtnl, sd_rtnl_message **ret) {
+        int r;
+
+        assert_return(rtnl, -EINVAL);
+        assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
+        assert_return(!rtnl->processing, -EBUSY);
+
+        rtnl->processing = true;
+        r = process_running(rtnl, ret);
+        rtnl->processing = false;
+
+        return r;
+}
+
+static usec_t calc_elapse(uint64_t usec) {
+        if (usec == (uint64_t) -1)
+                return 0;
+
+        if (usec == 0)
+                usec = RTNL_DEFAULT_TIMEOUT;
+
+        return now(CLOCK_MONOTONIC) + usec;
+}
+
+static int rtnl_poll(sd_rtnl *nl, uint64_t timeout_usec) {
+        struct pollfd p[1] = {};
+        struct timespec ts;
+        int r;
+
+        assert(nl);
+
+        p[0].fd = nl->fd;
         p[0].events = POLLIN;
 
+        r = ppoll(p, 1, timeout_usec == (uint64_t) -1 ? NULL :
+                        timespec_store(&ts, timeout_usec), NULL);
+        if (r < 0)
+                return -errno;
+
+        return r > 0 ? 1 : 0;
+}
+
+int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout_usec) {
+        assert_return(nl, -EINVAL);
+        assert_return(!rtnl_pid_changed(nl), -ECHILD);
+
+        if (nl->rqueue_size > 0)
+                return 0;
+
+        return rtnl_poll(nl, timeout_usec);
+}
+
+int sd_rtnl_call(sd_rtnl *nl,
+                sd_rtnl_message *message,
+                uint64_t usec,
+                sd_rtnl_message **ret) {
+        usec_t timeout;
+        uint32_t serial;
+        bool room = false;
+        int r;
+
+        assert_return(nl, -EINVAL);
+        assert_return(!rtnl_pid_changed(nl), -ECHILD);
+        assert_return(message, -EINVAL);
+
+        r = sd_rtnl_send(nl, message, &serial);
+        if (r < 0)
+                return r;
+
+        timeout = calc_elapse(usec);
+
         for (;;) {
-                _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *reply = NULL;
+                usec_t left;
+                _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *incoming = NULL;
 
-                if (timeout) {
-                        usec_t n;
+                if (!room) {
+                        sd_rtnl_message **q;
 
-                        n = now(CLOCK_MONOTONIC);
-                        if (n >= timeout)
-                                return -ETIMEDOUT;
+                        if (nl->rqueue_size >= RTNL_RQUEUE_MAX)
+                                return -ENOBUFS;
 
-                        timespec_store(&left, timeout - n);
-                }
+                        /* Make sure there's room for queueing this
+                         * locally, before we read the message */
 
-                r = ppoll(p, 1, timeout ? &left : NULL, NULL);
-                if (r < 0)
-                        return r;
+                        q = realloc(nl->rqueue, (nl->rqueue_size + 1) * sizeof(sd_rtnl_message*));
+                        if (!q)
+                                return -ENOMEM;
 
-                r = socket_read_message(nl, &reply);
+                        nl->rqueue = q;
+                        room = true;
+                }
+
+                r = socket_read_message(nl, &incoming);
                 if (r < 0)
                         return r;
-
-                if (r > 0) {
-                        int received_serial = message_get_serial(reply);
+                if (incoming) {
+                        uint32_t received_serial = message_get_serial(incoming);
 
                         if (received_serial == serial) {
-                                r = message_get_errno(reply);
+                                r = message_get_errno(incoming);
                                 if (r < 0)
                                         return r;
 
                                 if (ret) {
-                                        *ret = reply;
-                                        reply = NULL;
+                                        *ret = incoming;
+                                        incoming = NULL;
                                 }
 
-                                break;;
+                                return 1;
                         }
+
+                        /* Room was allocated on the queue above */
+                        nl->rqueue[nl->rqueue_size ++] = incoming;
+                        incoming = NULL;
+                        room = false;
+
+                        /* Try to read more, right away */
+                        continue;
                 }
-        }
+                if (r != 0)
+                        continue;
 
-        return 0;
+                if (timeout > 0) {
+                        usec_t n;
+
+                        n = now(CLOCK_MONOTONIC);
+                        if (n >= timeout)
+                                return -ETIMEDOUT;
+
+                        left = timeout - n;
+                } else
+                        left = (uint64_t) -1;
+
+                r = rtnl_poll(nl, left);
+                if (r < 0)
+                        return r;
+
+                r = dispatch_wqueue(nl);
+                if (r < 0)
+                        return r;
+        }
 }
index 61345bc..3615086 100644 (file)
@@ -53,7 +53,7 @@ static void test_link_configure(sd_rtnl *rtnl, int ifindex) {
         assert(type == IFLA_MTU);
         assert(mtu == *(unsigned int *) data);
 
-        assert(sd_rtnl_call(rtnl, message, 0, NULL) == 0);
+        assert(sd_rtnl_call(rtnl, message, 0, NULL) == 1);
 }
 
 static void test_route(void) {
@@ -133,7 +133,7 @@ int main(void) {
 
         assert(sd_rtnl_message_read(m, &type, &data) == 0);
 
-        assert(sd_rtnl_call(rtnl, m, 0, &r) >= 0);
+        assert(sd_rtnl_call(rtnl, m, 0, &r) == 1);
         assert(sd_rtnl_message_get_type(r, &type) >= 0);
         assert(type == RTM_NEWLINK);
 
@@ -155,7 +155,7 @@ int main(void) {
 
         assert(sd_rtnl_message_read(m, &type, &data) == 0);
 
-        assert(sd_rtnl_call(rtnl, m, -1, &r) >= 0);
+        assert(sd_rtnl_call(rtnl, m, -1, &r) == 1);
         while (sd_rtnl_message_read(r, &type, &data) > 0) {
                 switch (type) {
 //                        case IFLA_MTU:
index 2d166c4..87acc31 100644 (file)
@@ -37,9 +37,13 @@ int sd_rtnl_open(uint32_t groups, sd_rtnl **nl);
 sd_rtnl *sd_rtnl_ref(sd_rtnl *nl);
 sd_rtnl *sd_rtnl_unref(sd_rtnl *nl);
 
+int sd_rtnl_send(sd_rtnl *nl, sd_rtnl_message *message, uint32_t *serial);
 int sd_rtnl_call(sd_rtnl *nl, sd_rtnl_message *message, uint64_t timeout,
                  sd_rtnl_message **reply);
 
+int sd_rtnl_process(sd_rtnl *nl, sd_rtnl_message **ret);
+int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout);
+
 /* messages */
 int sd_rtnl_message_link_new(uint16_t msg_type, int index, unsigned int type,
                              unsigned int flags, sd_rtnl_message **ret);