X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=blobdiff_plain;f=src%2Fimport%2Fimport-job.c;h=809486500ba55cf6d6b045e5b1fda78956560d3f;hb=e28569311f5385cde76e4b84adbec6609b451cf9;hp=6de32686c5003533f5a8ec3eed1fd97c20ea1038;hpb=85dbc41dc67ff49fd8a843dbac5b8b5cb0b61155;p=elogind.git diff --git a/src/import/import-job.c b/src/import/import-job.c index 6de32686c..809486500 100644 --- a/src/import/import-job.c +++ b/src/import/import-job.c @@ -37,23 +37,23 @@ ImportJob* import_job_unref(ImportJob *j) { lzma_end(&j->xz); else if (j->compressed == IMPORT_JOB_GZIP) inflateEnd(&j->gzip); + else if (j->compressed == IMPORT_JOB_BZIP2) + BZ2_bzDecompressEnd(&j->bzip2); - if (j->hash_context) - gcry_md_close(j->hash_context); + if (j->checksum_context) + gcry_md_close(j->checksum_context); free(j->url); free(j->etag); strv_free(j->old_etags); free(j->payload); - free(j->sha256); + free(j->checksum); free(j); return NULL; } -DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref); - static void import_job_finish(ImportJob *j, int ret) { assert(j); @@ -63,6 +63,7 @@ static void import_job_finish(ImportJob *j, int ret) { if (ret == 0) { j->state = IMPORT_JOB_DONE; + j->progress_percent = 100; log_info("Download of %s complete.", j->url); } else { j->state = IMPORT_JOB_FAILED; @@ -124,23 +125,23 @@ void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) { goto finish; } - if (j->hash_context) { + if (j->checksum_context) { uint8_t *k; - k = gcry_md_read(j->hash_context, GCRY_MD_SHA256); + k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256); if (!k) { log_error("Failed to get checksum."); r = -EIO; goto finish; } - j->sha256 = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256)); - if (!j->sha256) { + j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256)); + if (!j->checksum) { r = log_oom(); goto finish; } - log_debug("SHA256 of %s is %s.", j->url, j->sha256); + log_debug("SHA256 of %s is %s.", j->url, j->checksum); } if (j->disk_fd >= 0 && j->allow_sparse) { @@ -180,7 +181,9 @@ static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) { assert(j); assert(p); - assert(sz > 0); + + if (sz <= 0) + return 0; if (j->written_uncompressed + sz < j->written_uncompressed) { log_error("File too large, overflow"); @@ -211,7 +214,7 @@ static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) { if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) return log_oom(); - memcpy((uint8_t*) j->payload + j->payload_size, p, sz); + memcpy(j->payload + j->payload_size, p, sz); j->payload_size += sz; } @@ -225,7 +228,9 @@ static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) { assert(j); assert(p); - assert(sz > 0); + + if (sz <= 0) + return 0; if (j->written_compressed + sz < j->written_compressed) { log_error("File too large, overflow"); @@ -243,8 +248,8 @@ static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) { return -EFBIG; } - if (j->hash_context) - gcry_md_write(j->hash_context, p, sz); + if (j->checksum_context) + gcry_md_write(j->checksum_context, p, sz); switch (j->compressed) { @@ -302,6 +307,29 @@ static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) { break; + case IMPORT_JOB_BZIP2: + j->bzip2.next_in = p; + j->bzip2.avail_in = sz; + + while (j->bzip2.avail_in > 0) { + uint8_t buffer[16 * 1024]; + + j->bzip2.next_out = (char*) buffer; + j->bzip2.avail_out = sizeof(buffer); + + r = BZ2_bzDecompress(&j->bzip2); + if (r != BZ_OK && r != BZ_STREAM_END) { + log_error("Decompression error."); + return -EIO; + } + + r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->bzip2.avail_out); + if (r < 0) + return r; + } + + break; + default: assert_not_reached("Unknown compression"); } @@ -335,8 +363,8 @@ static int import_job_open_disk(ImportJob *j) { } } - if (j->calc_hash) { - if (gcry_md_open(&j->hash_context, GCRY_MD_SHA256, 0) != 0) { + if (j->calc_checksum) { + if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) { log_error("Failed to initialize hash context."); return -EIO; } @@ -352,6 +380,9 @@ static int import_job_detect_compression(ImportJob *j) { static const uint8_t gzip_signature[] = { 0x1f, 0x8b }; + static const uint8_t bzip2_signature[] = { + 'B', 'Z', 'h' + }; _cleanup_free_ uint8_t *stub = NULL; size_t stub_size; @@ -360,18 +391,23 @@ static int import_job_detect_compression(ImportJob *j) { assert(j); - if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature))) + if (j->payload_size < MAX3(sizeof(xz_signature), + sizeof(gzip_signature), + sizeof(bzip2_signature))) return 0; if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0) j->compressed = IMPORT_JOB_XZ; else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0) j->compressed = IMPORT_JOB_GZIP; + else if (memcmp(j->payload, bzip2_signature, sizeof(bzip2_signature)) == 0) + j->compressed = IMPORT_JOB_BZIP2; else j->compressed = IMPORT_JOB_UNCOMPRESSED; log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ)); log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP)); + log_debug("Stream is BZIP2 compressed: %s", yes_no(j->compressed == IMPORT_JOB_BZIP2)); if (j->compressed == IMPORT_JOB_XZ) { lzma_ret xzr; @@ -389,6 +425,13 @@ static int import_job_detect_compression(ImportJob *j) { return -EIO; } } + if (j->compressed == IMPORT_JOB_BZIP2) { + r = BZ2_bzDecompressInit(&j->bzip2, 0, 0); + if (r != BZ_OK) { + log_error("Failed to initialize bzip2 decoder."); + return -EIO; + } + } r = import_job_open_disk(j); if (r < 0) @@ -429,7 +472,7 @@ static size_t import_job_write_callback(void *contents, size_t size, size_t nmem goto fail; } - memcpy((uint8_t*) j->payload + j->payload_size, contents, sz); + memcpy(j->payload + j->payload_size, contents, sz); j->payload_size += sz; r = import_job_detect_compression(j); @@ -531,6 +574,12 @@ static size_t import_job_header_callback(void *contents, size_t size, size_t nme return sz; } + if (j->on_header) { + r = j->on_header(j, contents, sz); + if (r < 0) + goto fail; + } + return sz; fail: @@ -557,17 +606,25 @@ static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl char buf[FORMAT_TIMESPAN_MAX]; if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) { + char y[FORMAT_BYTES_MAX]; usec_t left, done; done = n - j->start_usec; left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done; - log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC)); + log_info("Got %u%% of %s. %s left at %s/s.", + percent, + j->url, + format_timespan(buf, sizeof(buf), left, USEC_PER_SEC), + format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC)))); } else log_info("Got %u%% of %s.", percent, j->url); j->progress_percent = percent; j->last_status_usec = n; + + if (j->on_progress) + j->on_progress(j); } return 0; @@ -625,10 +682,22 @@ int import_job_begin(ImportJob *j) { if (!hdr) return -ENOMEM; - j->request_header = curl_slist_new(hdr, NULL); - if (!j->request_header) - return -ENOMEM; + if (!j->request_header) { + j->request_header = curl_slist_new(hdr, NULL); + if (!j->request_header) + return -ENOMEM; + } else { + struct curl_slist *l; + + l = curl_slist_append(j->request_header, hdr); + if (!l) + return -ENOMEM; + + j->request_header = l; + } + } + if (j->request_header) { if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK) return -EIO; }