1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2015 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/xattr.h>
25 #include "machine-pool.h"
26 #include "import-job.h"
28 /* Grow the /var/lib/machines directory after each 10MiB written */
29 #define IMPORT_GROW_INTERVAL_BYTES (UINT64_C(10) * UINT64_C(1024) * UINT64_C(1024))
31 ImportJob* import_job_unref(ImportJob *j) {
35 curl_glue_remove_and_free(j->glue, j->curl);
36 curl_slist_free_all(j->request_header);
38 safe_close(j->disk_fd);
40 if (j->compressed == IMPORT_JOB_XZ)
42 else if (j->compressed == IMPORT_JOB_GZIP)
44 else if (j->compressed == IMPORT_JOB_BZIP2)
45 BZ2_bzDecompressEnd(&j->bzip2);
47 if (j->checksum_context)
48 gcry_md_close(j->checksum_context);
52 strv_free(j->old_etags);
61 static void import_job_finish(ImportJob *j, int ret) {
64 if (j->state == IMPORT_JOB_DONE ||
65 j->state == IMPORT_JOB_FAILED)
69 j->state = IMPORT_JOB_DONE;
70 j->progress_percent = 100;
71 log_info("Download of %s complete.", j->url);
73 j->state = IMPORT_JOB_FAILED;
81 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
87 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
90 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
93 if (result != CURLE_OK) {
94 log_error("Transfer failed: %s", curl_easy_strerror(result));
99 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
100 if (code != CURLE_OK) {
101 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
104 } else if (status == 304) {
105 log_info("Image already downloaded. Skipping download.");
106 j->etag_exists = true;
109 } else if (status >= 300) {
110 log_error("HTTP request to %s failed with code %li.", j->url, status);
113 } else if (status < 200) {
114 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
119 if (j->state != IMPORT_JOB_RUNNING) {
120 log_error("Premature connection termination.");
125 if (j->content_length != (uint64_t) -1 &&
126 j->content_length != j->written_compressed) {
127 log_error("Download truncated.");
132 if (j->checksum_context) {
135 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
137 log_error("Failed to get checksum.");
142 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
148 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
151 if (j->disk_fd >= 0 && j->allow_sparse) {
152 /* Make sure the file size is right, in case the file was
153 * sparse and we just seeked for the last part */
155 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
156 log_error_errno(errno, "Failed to truncate file: %m");
162 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
164 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
167 struct timespec ut[2];
169 timespec_store(&ut[0], j->mtime);
171 (void) futimens(j->disk_fd, ut);
173 (void) fd_setcrtime(j->disk_fd, j->mtime);
180 import_job_finish(j, r);
183 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
192 if (j->written_uncompressed + sz < j->written_uncompressed) {
193 log_error("File too large, overflow");
197 if (j->written_uncompressed + sz > j->uncompressed_max) {
198 log_error("File overly large, refusing");
202 if (j->disk_fd >= 0) {
204 if (j->grow_machine_directory && j->written_since_last_grow >= IMPORT_GROW_INTERVAL_BYTES) {
205 j->written_since_last_grow = 0;
206 grow_machine_directory();
210 n = sparse_write(j->disk_fd, p, sz, 64);
212 n = write(j->disk_fd, p, sz);
214 log_error_errno(errno, "Failed to write file: %m");
217 if ((size_t) n < sz) {
218 log_error("Short write");
223 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
226 memcpy(j->payload + j->payload_size, p, sz);
227 j->payload_size += sz;
230 j->written_uncompressed += sz;
231 j->written_since_last_grow += sz;
236 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
245 if (j->written_compressed + sz < j->written_compressed) {
246 log_error("File too large, overflow");
250 if (j->written_compressed + sz > j->compressed_max) {
251 log_error("File overly large, refusing.");
255 if (j->content_length != (uint64_t) -1 &&
256 j->written_compressed + sz > j->content_length) {
257 log_error("Content length incorrect.");
261 if (j->checksum_context)
262 gcry_md_write(j->checksum_context, p, sz);
264 switch (j->compressed) {
266 case IMPORT_JOB_UNCOMPRESSED:
267 r = import_job_write_uncompressed(j, p, sz);
277 while (j->xz.avail_in > 0) {
278 uint8_t buffer[16 * 1024];
281 j->xz.next_out = buffer;
282 j->xz.avail_out = sizeof(buffer);
284 lzr = lzma_code(&j->xz, LZMA_RUN);
285 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
286 log_error("Decompression error.");
290 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
297 case IMPORT_JOB_GZIP:
299 j->gzip.avail_in = sz;
301 while (j->gzip.avail_in > 0) {
302 uint8_t buffer[16 * 1024];
304 j->gzip.next_out = buffer;
305 j->gzip.avail_out = sizeof(buffer);
307 r = inflate(&j->gzip, Z_NO_FLUSH);
308 if (r != Z_OK && r != Z_STREAM_END) {
309 log_error("Decompression error.");
313 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
320 case IMPORT_JOB_BZIP2:
321 j->bzip2.next_in = p;
322 j->bzip2.avail_in = sz;
324 while (j->bzip2.avail_in > 0) {
325 uint8_t buffer[16 * 1024];
327 j->bzip2.next_out = (char*) buffer;
328 j->bzip2.avail_out = sizeof(buffer);
330 r = BZ2_bzDecompress(&j->bzip2);
331 if (r != BZ_OK && r != BZ_STREAM_END) {
332 log_error("Decompression error.");
336 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->bzip2.avail_out);
344 assert_not_reached("Unknown compression");
347 j->written_compressed += sz;
352 static int import_job_open_disk(ImportJob *j) {
357 if (j->on_open_disk) {
358 r = j->on_open_disk(j);
363 if (j->disk_fd >= 0) {
364 /* Check if we can do sparse files */
366 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
367 j->allow_sparse = true;
370 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
372 j->allow_sparse = false;
376 if (j->calc_checksum) {
377 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
378 log_error("Failed to initialize hash context.");
386 static int import_job_detect_compression(ImportJob *j) {
387 static const uint8_t xz_signature[] = {
388 0xfd, '7', 'z', 'X', 'Z', 0x00
390 static const uint8_t gzip_signature[] = {
393 static const uint8_t bzip2_signature[] = {
397 _cleanup_free_ uint8_t *stub = NULL;
404 if (j->payload_size < MAX3(sizeof(xz_signature),
405 sizeof(gzip_signature),
406 sizeof(bzip2_signature)))
409 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
410 j->compressed = IMPORT_JOB_XZ;
411 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
412 j->compressed = IMPORT_JOB_GZIP;
413 else if (memcmp(j->payload, bzip2_signature, sizeof(bzip2_signature)) == 0)
414 j->compressed = IMPORT_JOB_BZIP2;
416 j->compressed = IMPORT_JOB_UNCOMPRESSED;
418 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
419 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
420 log_debug("Stream is BZIP2 compressed: %s", yes_no(j->compressed == IMPORT_JOB_BZIP2));
422 if (j->compressed == IMPORT_JOB_XZ) {
425 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
426 if (xzr != LZMA_OK) {
427 log_error("Failed to initialize XZ decoder.");
431 if (j->compressed == IMPORT_JOB_GZIP) {
432 r = inflateInit2(&j->gzip, 15+16);
434 log_error("Failed to initialize gzip decoder.");
438 if (j->compressed == IMPORT_JOB_BZIP2) {
439 r = BZ2_bzDecompressInit(&j->bzip2, 0, 0);
441 log_error("Failed to initialize bzip2 decoder.");
446 r = import_job_open_disk(j);
450 /* Now, take the payload we read so far, and decompress it */
452 stub_size = j->payload_size;
456 j->payload_allocated = 0;
458 j->state = IMPORT_JOB_RUNNING;
460 r = import_job_write_compressed(j, stub, stub_size);
467 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
468 ImportJob *j = userdata;
469 size_t sz = size * nmemb;
477 case IMPORT_JOB_ANALYZING:
478 /* Let's first check what it actually is */
480 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
485 memcpy(j->payload + j->payload_size, contents, sz);
486 j->payload_size += sz;
488 r = import_job_detect_compression(j);
494 case IMPORT_JOB_RUNNING:
496 r = import_job_write_compressed(j, contents, sz);
502 case IMPORT_JOB_DONE:
503 case IMPORT_JOB_FAILED:
508 assert_not_reached("Impossible state.");
514 import_job_finish(j, r);
518 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
519 ImportJob *j = userdata;
520 size_t sz = size * nmemb;
521 _cleanup_free_ char *length = NULL, *last_modified = NULL;
528 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
533 assert(j->state == IMPORT_JOB_ANALYZING);
535 r = curl_header_strdup(contents, sz, "ETag:", &etag);
544 if (strv_contains(j->old_etags, j->etag)) {
545 log_info("Image already downloaded. Skipping download.");
546 j->etag_exists = true;
547 import_job_finish(j, 0);
554 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
560 (void) safe_atou64(length, &j->content_length);
562 if (j->content_length != (uint64_t) -1) {
563 char bytes[FORMAT_BYTES_MAX];
565 if (j->content_length > j->compressed_max) {
566 log_error("Content too large.");
571 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
577 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
583 (void) curl_parse_http_time(last_modified, &j->mtime);
588 r = j->on_header(j, contents, sz);
596 import_job_finish(j, r);
600 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
601 ImportJob *j = userdata;
610 percent = ((100 * dlnow) / dltotal);
611 n = now(CLOCK_MONOTONIC);
613 if (n > j->last_status_usec + USEC_PER_SEC &&
614 percent != j->progress_percent &&
616 char buf[FORMAT_TIMESPAN_MAX];
618 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
619 char y[FORMAT_BYTES_MAX];
622 done = n - j->start_usec;
623 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
625 log_info("Got %u%% of %s. %s left at %s/s.",
628 format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
629 format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
631 log_info("Got %u%% of %s.", percent, j->url);
633 j->progress_percent = percent;
634 j->last_status_usec = n;
643 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
644 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
650 j = new0(ImportJob, 1);
654 j->state = IMPORT_JOB_INIT;
656 j->userdata = userdata;
658 j->content_length = (uint64_t) -1;
659 j->start_usec = now(CLOCK_MONOTONIC);
660 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
662 j->url = strdup(url);
672 int import_job_begin(ImportJob *j) {
677 if (j->state != IMPORT_JOB_INIT)
680 if (j->grow_machine_directory)
681 grow_machine_directory();
683 r = curl_glue_make(&j->curl, j->url, j);
687 if (!strv_isempty(j->old_etags)) {
688 _cleanup_free_ char *cc = NULL, *hdr = NULL;
690 cc = strv_join(j->old_etags, ", ");
694 hdr = strappend("If-None-Match: ", cc);
698 if (!j->request_header) {
699 j->request_header = curl_slist_new(hdr, NULL);
700 if (!j->request_header)
703 struct curl_slist *l;
705 l = curl_slist_append(j->request_header, hdr);
709 j->request_header = l;
713 if (j->request_header) {
714 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
718 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
721 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
724 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
727 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
730 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
733 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
736 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
739 r = curl_glue_add(j->glue, j->curl);
743 j->state = IMPORT_JOB_ANALYZING;