1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2015 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/xattr.h>
25 #include "import-job.h"
27 ImportJob* import_job_unref(ImportJob *j) {
31 curl_glue_remove_and_free(j->glue, j->curl);
32 curl_slist_free_all(j->request_header);
34 safe_close(j->disk_fd);
36 if (j->compressed == IMPORT_JOB_XZ)
38 else if (j->compressed == IMPORT_JOB_GZIP)
40 else if (j->compressed == IMPORT_JOB_BZIP2)
41 BZ2_bzDecompressEnd(&j->bzip2);
43 if (j->checksum_context)
44 gcry_md_close(j->checksum_context);
48 strv_free(j->old_etags);
57 static void import_job_finish(ImportJob *j, int ret) {
60 if (j->state == IMPORT_JOB_DONE ||
61 j->state == IMPORT_JOB_FAILED)
65 j->state = IMPORT_JOB_DONE;
66 j->progress_percent = 100;
67 log_info("Download of %s complete.", j->url);
69 j->state = IMPORT_JOB_FAILED;
77 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
83 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
86 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
89 if (result != CURLE_OK) {
90 log_error("Transfer failed: %s", curl_easy_strerror(result));
95 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
96 if (code != CURLE_OK) {
97 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
100 } else if (status == 304) {
101 log_info("Image already downloaded. Skipping download.");
102 j->etag_exists = true;
105 } else if (status >= 300) {
106 log_error("HTTP request to %s failed with code %li.", j->url, status);
109 } else if (status < 200) {
110 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
115 if (j->state != IMPORT_JOB_RUNNING) {
116 log_error("Premature connection termination.");
121 if (j->content_length != (uint64_t) -1 &&
122 j->content_length != j->written_compressed) {
123 log_error("Download truncated.");
128 if (j->checksum_context) {
131 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
133 log_error("Failed to get checksum.");
138 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
144 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
147 if (j->disk_fd >= 0 && j->allow_sparse) {
148 /* Make sure the file size is right, in case the file was
149 * sparse and we just seeked for the last part */
151 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
152 log_error_errno(errno, "Failed to truncate file: %m");
158 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
160 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
163 struct timespec ut[2];
165 timespec_store(&ut[0], j->mtime);
167 (void) futimens(j->disk_fd, ut);
169 (void) fd_setcrtime(j->disk_fd, j->mtime);
176 import_job_finish(j, r);
179 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
188 if (j->written_uncompressed + sz < j->written_uncompressed) {
189 log_error("File too large, overflow");
193 if (j->written_uncompressed + sz > j->uncompressed_max) {
194 log_error("File overly large, refusing");
198 if (j->disk_fd >= 0) {
201 n = sparse_write(j->disk_fd, p, sz, 64);
203 n = write(j->disk_fd, p, sz);
205 log_error_errno(errno, "Failed to write file: %m");
208 if ((size_t) n < sz) {
209 log_error("Short write");
214 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
217 memcpy(j->payload + j->payload_size, p, sz);
218 j->payload_size += sz;
221 j->written_uncompressed += sz;
226 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
235 if (j->written_compressed + sz < j->written_compressed) {
236 log_error("File too large, overflow");
240 if (j->written_compressed + sz > j->compressed_max) {
241 log_error("File overly large, refusing.");
245 if (j->content_length != (uint64_t) -1 &&
246 j->written_compressed + sz > j->content_length) {
247 log_error("Content length incorrect.");
251 if (j->checksum_context)
252 gcry_md_write(j->checksum_context, p, sz);
254 switch (j->compressed) {
256 case IMPORT_JOB_UNCOMPRESSED:
257 r = import_job_write_uncompressed(j, p, sz);
267 while (j->xz.avail_in > 0) {
268 uint8_t buffer[16 * 1024];
271 j->xz.next_out = buffer;
272 j->xz.avail_out = sizeof(buffer);
274 lzr = lzma_code(&j->xz, LZMA_RUN);
275 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
276 log_error("Decompression error.");
280 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
287 case IMPORT_JOB_GZIP:
289 j->gzip.avail_in = sz;
291 while (j->gzip.avail_in > 0) {
292 uint8_t buffer[16 * 1024];
294 j->gzip.next_out = buffer;
295 j->gzip.avail_out = sizeof(buffer);
297 r = inflate(&j->gzip, Z_NO_FLUSH);
298 if (r != Z_OK && r != Z_STREAM_END) {
299 log_error("Decompression error.");
303 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
310 case IMPORT_JOB_BZIP2:
311 j->bzip2.next_in = p;
312 j->bzip2.avail_in = sz;
314 while (j->bzip2.avail_in > 0) {
315 uint8_t buffer[16 * 1024];
317 j->bzip2.next_out = (char*) buffer;
318 j->bzip2.avail_out = sizeof(buffer);
320 r = BZ2_bzDecompress(&j->bzip2);
321 if (r != BZ_OK && r != BZ_STREAM_END) {
322 log_error("Decompression error.");
326 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->bzip2.avail_out);
334 assert_not_reached("Unknown compression");
337 j->written_compressed += sz;
342 static int import_job_open_disk(ImportJob *j) {
347 if (j->on_open_disk) {
348 r = j->on_open_disk(j);
353 if (j->disk_fd >= 0) {
354 /* Check if we can do sparse files */
356 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
357 j->allow_sparse = true;
360 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
362 j->allow_sparse = false;
366 if (j->calc_checksum) {
367 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
368 log_error("Failed to initialize hash context.");
376 static int import_job_detect_compression(ImportJob *j) {
377 static const uint8_t xz_signature[] = {
378 0xfd, '7', 'z', 'X', 'Z', 0x00
380 static const uint8_t gzip_signature[] = {
383 static const uint8_t bzip2_signature[] = {
387 _cleanup_free_ uint8_t *stub = NULL;
394 if (j->payload_size < MAX3(sizeof(xz_signature),
395 sizeof(gzip_signature),
396 sizeof(bzip2_signature)))
399 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
400 j->compressed = IMPORT_JOB_XZ;
401 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
402 j->compressed = IMPORT_JOB_GZIP;
403 else if (memcmp(j->payload, bzip2_signature, sizeof(bzip2_signature)) == 0)
404 j->compressed = IMPORT_JOB_BZIP2;
406 j->compressed = IMPORT_JOB_UNCOMPRESSED;
408 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
409 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
410 log_debug("Stream is BZIP2 compressed: %s", yes_no(j->compressed == IMPORT_JOB_BZIP2));
412 if (j->compressed == IMPORT_JOB_XZ) {
415 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
416 if (xzr != LZMA_OK) {
417 log_error("Failed to initialize XZ decoder.");
421 if (j->compressed == IMPORT_JOB_GZIP) {
422 r = inflateInit2(&j->gzip, 15+16);
424 log_error("Failed to initialize gzip decoder.");
428 if (j->compressed == IMPORT_JOB_BZIP2) {
429 r = BZ2_bzDecompressInit(&j->bzip2, 0, 0);
431 log_error("Failed to initialize bzip2 decoder.");
436 r = import_job_open_disk(j);
440 /* Now, take the payload we read so far, and decompress it */
442 stub_size = j->payload_size;
446 j->payload_allocated = 0;
448 j->state = IMPORT_JOB_RUNNING;
450 r = import_job_write_compressed(j, stub, stub_size);
457 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
458 ImportJob *j = userdata;
459 size_t sz = size * nmemb;
467 case IMPORT_JOB_ANALYZING:
468 /* Let's first check what it actually is */
470 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
475 memcpy(j->payload + j->payload_size, contents, sz);
476 j->payload_size += sz;
478 r = import_job_detect_compression(j);
484 case IMPORT_JOB_RUNNING:
486 r = import_job_write_compressed(j, contents, sz);
492 case IMPORT_JOB_DONE:
493 case IMPORT_JOB_FAILED:
498 assert_not_reached("Impossible state.");
504 import_job_finish(j, r);
508 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
509 ImportJob *j = userdata;
510 size_t sz = size * nmemb;
511 _cleanup_free_ char *length = NULL, *last_modified = NULL;
518 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
523 assert(j->state == IMPORT_JOB_ANALYZING);
525 r = curl_header_strdup(contents, sz, "ETag:", &etag);
534 if (strv_contains(j->old_etags, j->etag)) {
535 log_info("Image already downloaded. Skipping download.");
536 j->etag_exists = true;
537 import_job_finish(j, 0);
544 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
550 (void) safe_atou64(length, &j->content_length);
552 if (j->content_length != (uint64_t) -1) {
553 char bytes[FORMAT_BYTES_MAX];
555 if (j->content_length > j->compressed_max) {
556 log_error("Content too large.");
561 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
567 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
573 (void) curl_parse_http_time(last_modified, &j->mtime);
578 r = j->on_header(j, contents, sz);
586 import_job_finish(j, r);
590 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
591 ImportJob *j = userdata;
600 percent = ((100 * dlnow) / dltotal);
601 n = now(CLOCK_MONOTONIC);
603 if (n > j->last_status_usec + USEC_PER_SEC &&
604 percent != j->progress_percent &&
606 char buf[FORMAT_TIMESPAN_MAX];
608 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
609 char y[FORMAT_BYTES_MAX];
612 done = n - j->start_usec;
613 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
615 log_info("Got %u%% of %s. %s left at %s/s.",
618 format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
619 format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
621 log_info("Got %u%% of %s.", percent, j->url);
623 j->progress_percent = percent;
624 j->last_status_usec = n;
633 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
634 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
640 j = new0(ImportJob, 1);
644 j->state = IMPORT_JOB_INIT;
646 j->userdata = userdata;
648 j->content_length = (uint64_t) -1;
649 j->start_usec = now(CLOCK_MONOTONIC);
650 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
652 j->url = strdup(url);
662 int import_job_begin(ImportJob *j) {
667 if (j->state != IMPORT_JOB_INIT)
670 r = curl_glue_make(&j->curl, j->url, j);
674 if (!strv_isempty(j->old_etags)) {
675 _cleanup_free_ char *cc = NULL, *hdr = NULL;
677 cc = strv_join(j->old_etags, ", ");
681 hdr = strappend("If-None-Match: ", cc);
685 if (!j->request_header) {
686 j->request_header = curl_slist_new(hdr, NULL);
687 if (!j->request_header)
690 struct curl_slist *l;
692 l = curl_slist_append(j->request_header, hdr);
696 j->request_header = l;
700 if (j->request_header) {
701 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
705 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
708 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
711 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
714 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
717 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
720 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
723 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
726 r = curl_glue_add(j->glue, j->curl);
730 j->state = IMPORT_JOB_ANALYZING;