chiark / gitweb /
importd: add new bus calls for importing local tar and raw images
[elogind.git] / src / import / pull-job.c
index 165dae66193481055a7d8973df12f98d2ff6e8b6..ed9af2351ffe4ffe5ab16f8aab808094731b9a71 100644 (file)
@@ -25,9 +25,6 @@
 #include "machine-pool.h"
 #include "pull-job.h"
 
-/* Grow the /var/lib/machines directory after each 10MiB written */
-#define PULL_GROW_INTERVAL_BYTES (UINT64_C(10) * UINT64_C(1024) * UINT64_C(1024))
-
 PullJob* pull_job_unref(PullJob *j) {
         if (!j)
                 return NULL;
@@ -37,12 +34,7 @@ PullJob* pull_job_unref(PullJob *j) {
 
         safe_close(j->disk_fd);
 
-        if (j->compressed == PULL_JOB_XZ)
-                lzma_end(&j->xz);
-        else if (j->compressed == PULL_JOB_GZIP)
-                inflateEnd(&j->gzip);
-        else if (j->compressed == PULL_JOB_BZIP2)
-                BZ2_bzDecompressEnd(&j->bzip2);
+        import_compress_free(&j->compress);
 
         if (j->checksum_context)
                 gcry_md_close(j->checksum_context);
@@ -153,8 +145,7 @@ void pull_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
                  * sparse and we just seeked for the last part */
 
                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
-                        log_error_errno(errno, "Failed to truncate file: %m");
-                        r = -errno;
+                        r = log_error_errno(errno, "Failed to truncate file: %m");
                         goto finish;
                 }
 
@@ -180,7 +171,8 @@ finish:
         pull_job_finish(j, r);
 }
 
-static int pull_job_write_uncompressed(PullJob *j, void *p, size_t sz) {
+static int pull_job_write_uncompressed(const void *p, size_t sz, void *userdata) {
+        PullJob *j = userdata;
         ssize_t n;
 
         assert(j);
@@ -201,7 +193,7 @@ static int pull_job_write_uncompressed(PullJob *j, void *p, size_t sz) {
 
         if (j->disk_fd >= 0) {
 
-                if (j->grow_machine_directory && j->written_since_last_grow >= PULL_GROW_INTERVAL_BYTES) {
+                if (j->grow_machine_directory && j->written_since_last_grow >= GROW_INTERVAL_BYTES) {
                         j->written_since_last_grow = 0;
                         grow_machine_directory();
                 }
@@ -210,10 +202,8 @@ static int pull_job_write_uncompressed(PullJob *j, void *p, size_t sz) {
                         n = sparse_write(j->disk_fd, p, sz, 64);
                 else
                         n = write(j->disk_fd, p, sz);
-                if (n < 0) {
-                        log_error_errno(errno, "Failed to write file: %m");
-                        return -errno;
-                }
+                if (n < 0)
+                        return log_error_errno(errno, "Failed to write file: %m");
                 if ((size_t) n < sz) {
                         log_error("Short write");
                         return -EIO;
@@ -261,88 +251,9 @@ static int pull_job_write_compressed(PullJob *j, void *p, size_t sz) {
         if (j->checksum_context)
                 gcry_md_write(j->checksum_context, p, sz);
 
-        switch (j->compressed) {
-
-        case PULL_JOB_UNCOMPRESSED:
-                r = pull_job_write_uncompressed(j, p, sz);
-                if (r < 0)
-                        return r;
-
-                break;
-
-        case PULL_JOB_XZ:
-                j->xz.next_in = p;
-                j->xz.avail_in = sz;
-
-                while (j->xz.avail_in > 0) {
-                        uint8_t buffer[16 * 1024];
-                        lzma_ret lzr;
-
-                        j->xz.next_out = buffer;
-                        j->xz.avail_out = sizeof(buffer);
-
-                        lzr = lzma_code(&j->xz, LZMA_RUN);
-                        if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
-                                log_error("Decompression error.");
-                                return -EIO;
-                        }
-
-                        r = pull_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
-                        if (r < 0)
-                                return r;
-                }
-
-                break;
-
-        case PULL_JOB_GZIP:
-                j->gzip.next_in = p;
-                j->gzip.avail_in = sz;
-
-                while (j->gzip.avail_in > 0) {
-                        uint8_t buffer[16 * 1024];
-
-                        j->gzip.next_out = buffer;
-                        j->gzip.avail_out = sizeof(buffer);
-
-                        r = inflate(&j->gzip, Z_NO_FLUSH);
-                        if (r != Z_OK && r != Z_STREAM_END) {
-                                log_error("Decompression error.");
-                                return -EIO;
-                        }
-
-                        r = pull_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
-                        if (r < 0)
-                                return r;
-                }
-
-                break;
-
-        case PULL_JOB_BZIP2:
-                j->bzip2.next_in = p;
-                j->bzip2.avail_in = sz;
-
-                while (j->bzip2.avail_in > 0) {
-                        uint8_t buffer[16 * 1024];
-
-                        j->bzip2.next_out = (char*) buffer;
-                        j->bzip2.avail_out = sizeof(buffer);
-
-                        r = BZ2_bzDecompress(&j->bzip2);
-                        if (r != BZ_OK && r != BZ_STREAM_END) {
-                                log_error("Decompression error.");
-                                return -EIO;
-                        }
-
-                        r = pull_job_write_uncompressed(j,  buffer, sizeof(buffer) - j->bzip2.avail_out);
-                        if (r < 0)
-                                return r;
-                }
-
-                break;
-
-        default:
-                assert_not_reached("Unknown compression");
-        }
+        r = import_uncompress(&j->compress, p, sz, pull_job_write_uncompressed, j);
+        if (r < 0)
+                return r;
 
         j->written_compressed += sz;
 
@@ -384,16 +295,6 @@ static int pull_job_open_disk(PullJob *j) {
 }
 
 static int pull_job_detect_compression(PullJob *j) {
-        static const uint8_t xz_signature[] = {
-                0xfd, '7', 'z', 'X', 'Z', 0x00
-        };
-        static const uint8_t gzip_signature[] = {
-                0x1f, 0x8b
-        };
-        static const uint8_t bzip2_signature[] = {
-                'B', 'Z', 'h'
-        };
-
         _cleanup_free_ uint8_t *stub = NULL;
         size_t stub_size;
 
@@ -401,47 +302,13 @@ static int pull_job_detect_compression(PullJob *j) {
 
         assert(j);
 
-        if (j->payload_size < MAX3(sizeof(xz_signature),
-                                   sizeof(gzip_signature),
-                                   sizeof(bzip2_signature)))
+        r = import_uncompress_detect(&j->compress, j->payload, j->payload_size);
+        if (r < 0)
+                return log_error_errno(r, "Failed to initialize compressor: %m");
+        if (r == 0)
                 return 0;
 
-        if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
-                j->compressed = PULL_JOB_XZ;
-        else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
-                j->compressed = PULL_JOB_GZIP;
-        else if (memcmp(j->payload, bzip2_signature, sizeof(bzip2_signature)) == 0)
-                j->compressed = PULL_JOB_BZIP2;
-        else
-                j->compressed = PULL_JOB_UNCOMPRESSED;
-
-        log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == PULL_JOB_XZ));
-        log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == PULL_JOB_GZIP));
-        log_debug("Stream is BZIP2 compressed: %s", yes_no(j->compressed == PULL_JOB_BZIP2));
-
-        if (j->compressed == PULL_JOB_XZ) {
-                lzma_ret xzr;
-
-                xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
-                if (xzr != LZMA_OK) {
-                        log_error("Failed to initialize XZ decoder.");
-                        return -EIO;
-                }
-        }
-        if (j->compressed == PULL_JOB_GZIP) {
-                r = inflateInit2(&j->gzip, 15+16);
-                if (r != Z_OK) {
-                        log_error("Failed to initialize gzip decoder.");
-                        return -EIO;
-                }
-        }
-        if (j->compressed == PULL_JOB_BZIP2) {
-                r = BZ2_bzDecompressInit(&j->bzip2, 0, 0);
-                if (r != BZ_OK) {
-                        log_error("Failed to initialize bzip2 decoder.");
-                        return -EIO;
-                }
-        }
+        log_debug("Stream is compressed: %s", import_compress_type_to_string(j->compress.type));
 
         r = pull_job_open_disk(j);
         if (r < 0)