chiark / gitweb /
journal/compress: add stream compression/decompression functions
authorZbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl>
Wed, 25 Jun 2014 01:24:46 +0000 (21:24 -0400)
committerZbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl>
Thu, 26 Jun 2014 05:41:04 +0000 (01:41 -0400)
src/journal/compress.c
src/journal/compress.h
src/journal/test-compress.c
src/shared/copy.c

index cafe8f4..f36c430 100644 (file)
 #include <assert.h>
 #include <stdlib.h>
 #include <string.h>
+#include <unistd.h>
 #include <lzma.h>
 
-#include "macro.h"
 #include "compress.h"
+#include "macro.h"
+#include "util.h"
 
 bool compress_blob(const void *src, uint64_t src_size, void *dst, uint64_t *dst_size) {
         lzma_ret ret;
@@ -40,12 +42,12 @@ bool compress_blob(const void *src, uint64_t src_size, void *dst, uint64_t *dst_
          * compressed result is longer than the original */
 
         ret = lzma_easy_buffer_encode(LZMA_PRESET_DEFAULT, LZMA_CHECK_NONE, NULL,
-                                      src, src_size, dst, &out_pos, *dst_size);
+                                      src, src_size, dst, &out_pos, src_size);
         if (ret != LZMA_OK)
                 return false;
 
         /* Is it actually shorter? */
-        if (out_pos == *dst_size)
+        if (out_pos == src_size)
                 return false;
 
         *dst_size = out_pos;
@@ -200,3 +202,149 @@ fail:
 
         return b;
 }
+
+int compress_stream(int fdf, int fdt, off_t max_bytes) {
+        _cleanup_(lzma_end) lzma_stream s = LZMA_STREAM_INIT;
+        lzma_ret ret;
+
+        uint8_t buf[BUFSIZ], out[BUFSIZ];
+        lzma_action action = LZMA_RUN;
+
+        assert(fdf >= 0);
+        assert(fdt >= 0);
+
+        ret = lzma_easy_encoder(&s, LZMA_PRESET_DEFAULT, LZMA_CHECK_CRC64);
+        if (ret != LZMA_OK) {
+                log_error("Failed to initialize XZ encoder: code %d", ret);
+                return -EINVAL;
+        }
+
+        for (;;) {
+                if (s.avail_in == 0 && action == LZMA_RUN) {
+                        size_t m = sizeof(buf);
+                        ssize_t n;
+
+                        if (max_bytes != -1 && m > (size_t) max_bytes)
+                                m = max_bytes;
+
+                        n = read(fdf, buf, m);
+                        if (n < 0)
+                                return -errno;
+                        if (n == 0)
+                                action = LZMA_FINISH;
+                        else {
+                                s.next_in = buf;
+                                s.avail_in = n;
+
+                                if (max_bytes != -1) {
+                                        assert(max_bytes >= n);
+                                        max_bytes -= n;
+                                }
+                        }
+                }
+
+                if (s.avail_out == 0) {
+                        s.next_out = out;
+                        s.avail_out = sizeof(out);
+                }
+
+                ret = lzma_code(&s, action);
+                if (ret != LZMA_OK && ret != LZMA_STREAM_END) {
+                        log_error("Compression failed: code %d", ret);
+                        return -EBADMSG;
+                }
+
+                if (s.avail_out == 0 || ret == LZMA_STREAM_END) {
+                        ssize_t n, k;
+
+                        n = sizeof(out) - s.avail_out;
+
+                        errno = 0;
+                        k = loop_write(fdt, out, n, false);
+                        if (k < 0)
+                                return k;
+                        if (k != n)
+                                return errno ? -errno : -EIO;
+
+                        if (ret == LZMA_STREAM_END) {
+                                log_debug("Compression finished (%zu -> %zu bytes, %.1f%%)",
+                                          s.total_in, s.total_out,
+                                          (double) s.total_out / s.total_in * 100);
+
+                                return 0;
+                        }
+                }
+        }
+}
+
+int decompress_stream(int fdf, int fdt, off_t max_bytes) {
+        _cleanup_(lzma_end) lzma_stream s = LZMA_STREAM_INIT;
+        lzma_ret ret;
+
+        uint8_t buf[BUFSIZ], out[BUFSIZ];
+        lzma_action action = LZMA_RUN;
+
+        assert(fdf >= 0);
+        assert(fdt >= 0);
+
+        ret = lzma_stream_decoder(&s, UINT64_MAX, 0);
+        if (ret != LZMA_OK) {
+                log_error("Failed to initialize XZ decoder: code %d", ret);
+                return -EINVAL;
+        }
+
+        for (;;) {
+                if (s.avail_in == 0 && action == LZMA_RUN) {
+                        ssize_t n;
+
+                        n = read(fdf, buf, sizeof(buf));
+                        if (n < 0)
+                                return -errno;
+                        if (n == 0)
+                                action = LZMA_FINISH;
+                        else {
+                                s.next_in = buf;
+                                s.avail_in = n;
+                        }
+                }
+
+                if (s.avail_out == 0) {
+                        s.next_out = out;
+                        s.avail_out = sizeof(out);
+                }
+
+                ret = lzma_code(&s, action);
+                if (ret != LZMA_OK && ret != LZMA_STREAM_END) {
+                        log_error("Decompression failed: code %d", ret);
+                        return -EBADMSG;
+                }
+
+                if (s.avail_out == 0 || ret == LZMA_STREAM_END) {
+                        ssize_t n, k;
+
+                        n = sizeof(out) - s.avail_out;
+
+                        if (max_bytes != -1) {
+                                if (max_bytes < n)
+                                        return -E2BIG;
+
+                                max_bytes -= n;
+                        }
+
+                        errno = 0;
+                        k = loop_write(fdt, out, n, false);
+                        if (k < 0)
+                                return k;
+                        if (k != n)
+                                return errno ? -errno : -EIO;
+
+                        if (ret == LZMA_STREAM_END) {
+                                log_debug("Decompression finished (%zu -> %zu bytes, %.1f%%)",
+                                          s.total_in, s.total_out,
+                                          (double) s.total_out / s.total_in * 100);
+
+                                return 0;
+                        }
+                }
+        }
+}
index 2b87e73..f37a6b3 100644 (file)
@@ -23,6 +23,7 @@
 
 #include <inttypes.h>
 #include <stdbool.h>
+#include <unistd.h>
 
 bool compress_blob(const void *src, uint64_t src_size, void *dst, uint64_t *dst_size);
 
@@ -33,3 +34,6 @@ bool uncompress_startswith(const void *src, uint64_t src_size,
                            void **buffer, uint64_t *buffer_size,
                            const void *prefix, uint64_t prefix_len,
                            uint8_t extra);
+
+int compress_stream(int fdf, int fdt, off_t max_size);
+int decompress_stream(int fdf, int fdt, off_t max_size);
index 15b3f9a..b098ef9 100644 (file)
@@ -68,9 +68,63 @@ static void test_uncompress_startswith(void) {
                                         "barbarbar", 9, ' '));
 }
 
+static void test_compress_stream(const char *srcfile) {
+        _cleanup_close_ int src = -1, dst = -1, dst2 = -1;
+        char pattern[] = "/tmp/systemd-test.xz.XXXXXX",
+             pattern2[] = "/tmp/systemd-test.xz.XXXXXX";
+        int r;
+        _cleanup_free_ char *cmd, *cmd2;
+        struct stat st = {};
+
+        log_debug("/* create source from %s */", srcfile);
+
+        assert_se((src = open(srcfile, O_RDONLY|O_CLOEXEC)) >= 0);
+
+        log_debug("/* test compression */");
+
+        assert_se((dst = mkostemp_safe(pattern, O_RDWR|O_CLOEXEC)) >= 0);
+
+        r = compress_stream(src, dst, -1);
+        assert(r == 0);
+
+        assert_se(asprintf(&cmd, "xzcat %s | diff %s -", pattern, srcfile) > 0);
+        assert_se(system(cmd) == 0);
+
+        log_debug("/* test decompression */");
+
+        assert_se((dst2 = mkostemp_safe(pattern2, O_RDWR|O_CLOEXEC)) >= 0);
+
+        assert_se(stat(srcfile, &st) == 0);
+
+        assert_se(lseek(dst, 0, SEEK_SET) == 0);
+        r = decompress_stream(dst, dst2, st.st_size);
+        assert(r == 0);
+
+        assert_se(asprintf(&cmd2, "diff %s %s", srcfile, pattern2) > 0);
+        assert_se(system(cmd2) == 0);
+
+        log_debug("/* test faulty decompression */");
+
+        assert_se(lseek(dst, 1, SEEK_SET) == 1);
+        r = decompress_stream(dst, dst2, st.st_size);
+        assert(r == -EBADMSG);
+
+        assert_se(lseek(dst, 0, SEEK_SET) == 0);
+        assert_se(lseek(dst2, 0, SEEK_SET) == 0);
+        r = decompress_stream(dst, dst2, st.st_size - 1);
+        assert(r == -E2BIG);
+
+        assert_se(unlink(pattern) == 0);
+        assert_se(unlink(pattern2) == 0);
+}
+
 int main(int argc, char *argv[]) {
+
+        log_set_max_level(LOG_DEBUG);
+
         test_compress_uncompress();
         test_uncompress_startswith();
+        test_compress_stream(argv[0]);
 
         return 0;
 }
index ebd6699..3744797 100644 (file)
@@ -29,9 +29,7 @@ int copy_bytes(int fdf, int fdt, off_t max_bytes) {
         for (;;) {
                 char buf[PIPE_BUF];
                 ssize_t n, k;
-                size_t m;
-
-                m = sizeof(buf);
+                size_t m = sizeof(buf);
 
                 if (max_bytes != (off_t) -1) {