chiark / gitweb /
journal-remote: avoid copying input data
authorZbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl>
Sat, 12 Jul 2014 03:20:08 +0000 (23:20 -0400)
committerZbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl>
Wed, 16 Jul 2014 02:34:42 +0000 (22:34 -0400)
Instead of copying fields into new memory allocations, simply keep pointers
into the receive buffer. Data in this buffer is only copied when there is not
enough space for new data and a large chunk of the buffer contains old data.

src/journal-remote/journal-remote-parse.c
src/journal-remote/journal-remote-parse.h
src/journal-remote/journal-remote-write.c
src/journal-remote/journal-remote-write.h

index cdc920eb4c3cb439f120e7a0bad86e28114a4697..dfb87d49af7f88a17b1de4751f751c088f09afac 100644 (file)
@@ -70,24 +70,38 @@ RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
         return source;
 }
 
         return source;
 }
 
+static char* realloc_buffer(RemoteSource *source, size_t size) {
+        char *b, *old = source->buf;
+
+        b = GREEDY_REALLOC(source->buf, source->size, size);
+        if (!b)
+                return NULL;
+
+        iovw_rebase(&source->iovw, old, source->buf);
+
+        return b;
+}
+
 static int get_line(RemoteSource *source, char **line, size_t *size) {
 static int get_line(RemoteSource *source, char **line, size_t *size) {
-        ssize_t n, remain;
+        ssize_t n;
         char *c = NULL;
         char *c = NULL;
-        char *newbuf = NULL;
-        size_t newsize = 0;
 
         assert(source);
         assert(source->state == STATE_LINE);
 
         assert(source);
         assert(source->state == STATE_LINE);
+        assert(source->offset <= source->filled);
         assert(source->filled <= source->size);
         assert(source->buf == NULL || source->size > 0);
         assert(source->fd >= 0);
 
         while (true) {
         assert(source->filled <= source->size);
         assert(source->buf == NULL || source->size > 0);
         assert(source->fd >= 0);
 
         while (true) {
-                if (source->buf)
-                        c = memchr(source->buf + source->scanned, '\n',
-                                   source->filled - source->scanned);
-                if (c != NULL)
-                        break;
+                if (source->buf) {
+                        size_t start = MAX(source->scanned, source->offset);
+
+                        c = memchr(source->buf + start, '\n',
+                                   source->filled - start);
+                        if (c != NULL)
+                                break;
+                }
 
                 source->scanned = source->filled;
                 if (source->scanned >= DATA_SIZE_MAX) {
 
                 source->scanned = source->filled;
                 if (source->scanned >= DATA_SIZE_MAX) {
@@ -100,15 +114,12 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
                         return -EWOULDBLOCK;
 
                 if (source->size - source->filled < LINE_CHUNK &&
                         return -EWOULDBLOCK;
 
                 if (source->size - source->filled < LINE_CHUNK &&
-                    !GREEDY_REALLOC(source->buf, source->size,
-                                    MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
+                    !realloc_buffer(source,
+                                    MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
                                 return log_oom();
 
                 assert(source->size - source->filled >= LINE_CHUNK ||
                                 return log_oom();
 
                 assert(source->size - source->filled >= LINE_CHUNK ||
-                       source->size == DATA_SIZE_MAX);
-
-                // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
-                // to accomodate such big fields.
+                       source->size == ENTRY_SIZE_MAX);
 
                 n = read(source->fd, source->buf + source->filled,
                          source->size - source->filled);
 
                 n = read(source->fd, source->buf + source->filled,
                          source->size - source->filled);
@@ -123,23 +134,9 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
                 source->filled += n;
         }
 
                 source->filled += n;
         }
 
-        *line = source->buf;
-        *size = c + 1 - source->buf;
-
-        /* Check if something remains */
-        remain = source->buf + source->filled - c - 1;
-        assert(remain >= 0);
-        if (remain) {
-                newsize = MAX(remain, LINE_CHUNK);
-                newbuf = malloc(newsize);
-                if (!newbuf)
-                        return log_oom();
-                memcpy(newbuf, c + 1, remain);
-        }
-        source->buf = newbuf;
-        source->size = newsize;
-        source->filled = remain;
-        source->scanned = 0;
+        *line = source->buf + source->offset;
+        *size = c + 1 - source->buf - source->offset;
+        source->offset += *size;
 
         return 1;
 }
 
         return 1;
 }
@@ -148,8 +145,7 @@ int push_data(RemoteSource *source, const char *data, size_t size) {
         assert(source);
         assert(source->state != STATE_EOF);
 
         assert(source);
         assert(source->state != STATE_EOF);
 
-        if (!GREEDY_REALLOC(source->buf, source->size,
-                            source->filled + size)) {
+        if (!realloc_buffer(source, source->filled + size)) {
                 log_error("Failed to store received data of size %zu "
                           "(in addition to existing %zu bytes with %zu filled): %s",
                           size, source->size, source->filled, strerror(ENOMEM));
                 log_error("Failed to store received data of size %zu "
                           "(in addition to existing %zu bytes with %zu filled): %s",
                           size, source->size, source->filled, strerror(ENOMEM));
@@ -163,28 +159,27 @@ int push_data(RemoteSource *source, const char *data, size_t size) {
 }
 
 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
 }
 
 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
-        int n;
-        char *newbuf = NULL;
-        size_t newsize = 0, remain;
 
         assert(source);
         assert(source->state == STATE_DATA_START ||
                source->state == STATE_DATA ||
                source->state == STATE_DATA_FINISH);
         assert(size <= DATA_SIZE_MAX);
 
         assert(source);
         assert(source->state == STATE_DATA_START ||
                source->state == STATE_DATA ||
                source->state == STATE_DATA_FINISH);
         assert(size <= DATA_SIZE_MAX);
+        assert(source->offset <= source->filled);
         assert(source->filled <= source->size);
         assert(source->filled <= source->size);
-        assert(source->scanned <= source->filled);
         assert(source->buf != NULL || source->size == 0);
         assert(source->buf == NULL || source->size > 0);
         assert(source->fd >= 0);
         assert(data);
 
         assert(source->buf != NULL || source->size == 0);
         assert(source->buf == NULL || source->size > 0);
         assert(source->fd >= 0);
         assert(data);
 
-        while(source->filled < size) {
+        while (source->filled - source->offset < size) {
+                int n;
+
                 if (source->passive_fd)
                         /* we have to wait for some data to come to us */
                         return -EWOULDBLOCK;
 
                 if (source->passive_fd)
                         /* we have to wait for some data to come to us */
                         return -EWOULDBLOCK;
 
-                if (!GREEDY_REALLOC(source->buf, source->size, size))
+                if (!realloc_buffer(source, source->offset + size))
                         return log_oom();
 
                 n = read(source->fd, source->buf + source->filled,
                         return log_oom();
 
                 n = read(source->fd, source->buf + source->filled,
@@ -200,29 +195,15 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
                 source->filled += n;
         }
 
                 source->filled += n;
         }
 
-        *data = source->buf;
-
-        /* Check if something remains */
-        assert(size <= source->filled);
-        remain = source->filled - size;
-        if (remain) {
-                newsize = MAX(remain, LINE_CHUNK);
-                newbuf = malloc(newsize);
-                if (!newbuf)
-                        return log_oom();
-                memcpy(newbuf, source->buf + size, remain);
-        }
-        source->buf = newbuf;
-        source->size = newsize;
-        source->filled = remain;
-        source->scanned = 0;
+        *data = source->buf + source->offset;
+        source->offset += size;
 
         return 1;
 }
 
 static int get_data_size(RemoteSource *source) {
         int r;
 
         return 1;
 }
 
 static int get_data_size(RemoteSource *source) {
         int r;
-        _cleanup_free_ void *data = NULL;
+        void *data;
 
         assert(source);
         assert(source->state == STATE_DATA_START);
 
         assert(source);
         assert(source->state == STATE_DATA_START);
@@ -260,7 +241,7 @@ static int get_data_data(RemoteSource *source, void **data) {
 
 static int get_data_newline(RemoteSource *source) {
         int r;
 
 static int get_data_newline(RemoteSource *source) {
         int r;
-        _cleanup_free_ char *data = NULL;
+        char *data;
 
         assert(source);
         assert(source->state == STATE_DATA_FINISH);
 
         assert(source);
         assert(source->state == STATE_DATA_FINISH);
@@ -350,15 +331,12 @@ int process_data(RemoteSource *source) {
 
                 if (n == 1) {
                         log_debug("Received empty line, event is ready");
 
                 if (n == 1) {
                         log_debug("Received empty line, event is ready");
-                        free(line);
                         return 1;
                 }
 
                 r = process_dunder(source, line, n);
                         return 1;
                 }
 
                 r = process_dunder(source, line, n);
-                if (r != 0) {
-                        free(line);
+                if (r != 0)
                         return r < 0 ? r : 0;
                         return r < 0 ? r : 0;
-                }
 
                 /* MESSAGE=xxx\n
                    or
 
                 /* MESSAGE=xxx\n
                    or
@@ -377,7 +355,6 @@ int process_data(RemoteSource *source) {
                 r = iovw_put(&source->iovw, line, n);
                 if (r < 0) {
                         log_error("Failed to put line in iovect");
                 r = iovw_put(&source->iovw, line, n);
                 if (r < 0) {
                         log_error("Failed to put line in iovect");
-                        free(line);
                         return r;
                 }
 
                         return r;
                 }
 
@@ -450,6 +427,7 @@ int process_data(RemoteSource *source) {
 }
 
 int process_source(RemoteSource *source, bool compress, bool seal) {
 }
 
 int process_source(RemoteSource *source, bool compress, bool seal) {
+        size_t remain, target;
         int r;
 
         assert(source);
         int r;
 
         assert(source);
@@ -480,5 +458,36 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
 
  freeing:
         iovw_free_contents(&source->iovw);
 
  freeing:
         iovw_free_contents(&source->iovw);
+
+        /* possibly reset buffer position */
+        remain = source->filled - source->offset;
+
+        if (remain == 0) /* no brainer */
+                source->offset = source->scanned = source->filled = 0;
+        else if (source->offset > source->size - source->filled &&
+                 source->offset > remain) {
+                memcpy(source->buf, source->buf + source->offset, remain);
+                source->offset = source->scanned = 0;
+                source->filled = remain;
+        }
+
+        target = source->size;
+        while (target > 16 * LINE_CHUNK && remain < target / 2)
+                target /= 2;
+        if (target < source->size) {
+                char *tmp;
+
+                tmp = realloc(source->buf, target);
+                if (tmp)
+                        log_warning("Failed to reallocate buffer to (smaller) size %zu",
+                                    target);
+                else {
+                        log_debug("Reallocated buffer from %zu to %zu bytes",
+                                  source->size, target);
+                        source->buf = tmp;
+                        source->size = target;
+                }
+        }
+
         return r;
 }
         return r;
 }
index 07d6ddb67feb5a4ad2abce61445a76f84f835f37..8499f4eb828cf7a325b19a07b31fb55f622dae99 100644 (file)
@@ -38,10 +38,11 @@ typedef struct RemoteSource {
         bool passive_fd;
 
         char *buf;
         bool passive_fd;
 
         char *buf;
-        size_t size;
-        size_t scanned;
-        size_t filled;
-        size_t data_size;
+        size_t size;       /* total size of the buffer */
+        size_t offset;     /* offset to the beginning of live data in the buffer */
+        size_t scanned;    /* number of bytes since the beginning of data without a newline */
+        size_t filled;     /* total number of bytes in the buffer */
+        size_t data_size;  /* size of the binary data chunk being processed */
 
         struct iovec_wrapper iovw;
 
 
         struct iovec_wrapper iovw;
 
index cdd06f9effc25a9238eb289e1b9121010304fbb4..bec4cb1f7b9f640337dac473e9cccdf3b3baf7e9 100644 (file)
@@ -31,8 +31,6 @@ int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
 }
 
 void iovw_free_contents(struct iovec_wrapper *iovw) {
 }
 
 void iovw_free_contents(struct iovec_wrapper *iovw) {
-        for (size_t j = 0; j < iovw->count; j++)
-                free(iovw->iovec[j].iov_base);
         free(iovw->iovec);
         iovw->iovec = NULL;
         iovw->size_bytes = iovw->count = 0;
         free(iovw->iovec);
         iovw->iovec = NULL;
         iovw->size_bytes = iovw->count = 0;
@@ -41,12 +39,19 @@ void iovw_free_contents(struct iovec_wrapper *iovw) {
 size_t iovw_size(struct iovec_wrapper *iovw) {
         size_t n = 0, i;
 
 size_t iovw_size(struct iovec_wrapper *iovw) {
         size_t n = 0, i;
 
-        for(i = 0; i < iovw->count; i++)
+        for (i = 0; i < iovw->count; i++)
                 n += iovw->iovec[i].iov_len;
 
         return n;
 }
 
                 n += iovw->iovec[i].iov_len;
 
         return n;
 }
 
+void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
+        size_t i;
+
+        for (i = 0; i < iovw->count; i++)
+                iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
+}
+
 /**********************************************************************
  **********************************************************************
  **********************************************************************/
 /**********************************************************************
  **********************************************************************
  **********************************************************************/
index 9c5a641d2e01749cf1dc3c77356bad1895b82bb6..aa381c661ee32ec7cb027bc544a60d949c629fd4 100644 (file)
@@ -36,6 +36,7 @@ struct iovec_wrapper {
 int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len);
 void iovw_free_contents(struct iovec_wrapper *iovw);
 size_t iovw_size(struct iovec_wrapper *iovw);
 int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len);
 void iovw_free_contents(struct iovec_wrapper *iovw);
 size_t iovw_size(struct iovec_wrapper *iovw);
+void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new);
 
 typedef struct Writer {
         JournalFile *journal;
 
 typedef struct Writer {
         JournalFile *journal;