1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2015 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/xattr.h>
25 #include "import-job.h"
27 ImportJob* import_job_unref(ImportJob *j) {
31 curl_glue_remove_and_free(j->glue, j->curl);
32 curl_slist_free_all(j->request_header);
34 safe_close(j->disk_fd);
36 if (j->compressed == IMPORT_JOB_XZ)
38 else if (j->compressed == IMPORT_JOB_GZIP)
42 gcry_md_close(j->hash_context);
46 strv_free(j->old_etags);
55 DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
57 static void import_job_finish(ImportJob *j, int ret) {
60 if (j->state == IMPORT_JOB_DONE ||
61 j->state == IMPORT_JOB_FAILED)
65 j->state = IMPORT_JOB_DONE;
66 log_info("Download of %s complete.", j->url);
68 j->state = IMPORT_JOB_FAILED;
76 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
82 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
85 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
88 if (result != CURLE_OK) {
89 log_error("Transfer failed: %s", curl_easy_strerror(result));
94 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
95 if (code != CURLE_OK) {
96 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
99 } else if (status == 304) {
100 log_info("Image already downloaded. Skipping download.");
101 j->etag_exists = true;
104 } else if (status >= 300) {
105 log_error("HTTP request to %s failed with code %li.", j->url, status);
108 } else if (status < 200) {
109 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
114 if (j->state != IMPORT_JOB_RUNNING) {
115 log_error("Premature connection termination.");
120 if (j->content_length != (uint64_t) -1 &&
121 j->content_length != j->written_compressed) {
122 log_error("Download truncated.");
127 if (j->hash_context) {
130 k = gcry_md_read(j->hash_context, GCRY_MD_SHA256);
132 log_error("Failed to get checksum.");
137 j->sha256 = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
143 log_debug("SHA256 of %s is %s.", j->url, j->sha256);
146 if (j->disk_fd >= 0 && j->allow_sparse) {
147 /* Make sure the file size is right, in case the file was
148 * sparse and we just seeked for the last part */
150 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
151 log_error_errno(errno, "Failed to truncate file: %m");
157 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
159 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
162 struct timespec ut[2];
164 timespec_store(&ut[0], j->mtime);
166 (void) futimens(j->disk_fd, ut);
168 (void) fd_setcrtime(j->disk_fd, j->mtime);
175 import_job_finish(j, r);
178 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
185 if (j->written_uncompressed + sz < j->written_uncompressed) {
186 log_error("File too large, overflow");
190 if (j->written_uncompressed + sz > j->uncompressed_max) {
191 log_error("File overly large, refusing");
195 if (j->disk_fd >= 0) {
198 n = sparse_write(j->disk_fd, p, sz, 64);
200 n = write(j->disk_fd, p, sz);
202 log_error_errno(errno, "Failed to write file: %m");
205 if ((size_t) n < sz) {
206 log_error("Short write");
211 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
214 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
215 j->payload_size += sz;
218 j->written_uncompressed += sz;
223 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
230 if (j->written_compressed + sz < j->written_compressed) {
231 log_error("File too large, overflow");
235 if (j->written_compressed + sz > j->compressed_max) {
236 log_error("File overly large, refusing.");
240 if (j->content_length != (uint64_t) -1 &&
241 j->written_compressed + sz > j->content_length) {
242 log_error("Content length incorrect.");
247 gcry_md_write(j->hash_context, p, sz);
249 switch (j->compressed) {
251 case IMPORT_JOB_UNCOMPRESSED:
252 r = import_job_write_uncompressed(j, p, sz);
262 while (j->xz.avail_in > 0) {
263 uint8_t buffer[16 * 1024];
266 j->xz.next_out = buffer;
267 j->xz.avail_out = sizeof(buffer);
269 lzr = lzma_code(&j->xz, LZMA_RUN);
270 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
271 log_error("Decompression error.");
275 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
282 case IMPORT_JOB_GZIP:
284 j->gzip.avail_in = sz;
286 while (j->gzip.avail_in > 0) {
287 uint8_t buffer[16 * 1024];
289 j->gzip.next_out = buffer;
290 j->gzip.avail_out = sizeof(buffer);
292 r = inflate(&j->gzip, Z_NO_FLUSH);
293 if (r != Z_OK && r != Z_STREAM_END) {
294 log_error("Decompression error.");
298 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
306 assert_not_reached("Unknown compression");
309 j->written_compressed += sz;
314 static int import_job_open_disk(ImportJob *j) {
319 if (j->on_open_disk) {
320 r = j->on_open_disk(j);
325 if (j->disk_fd >= 0) {
326 /* Check if we can do sparse files */
328 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
329 j->allow_sparse = true;
332 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
334 j->allow_sparse = false;
339 if (gcry_md_open(&j->hash_context, GCRY_MD_SHA256, 0) != 0) {
340 log_error("Failed to initialize hash context.");
348 static int import_job_detect_compression(ImportJob *j) {
349 static const uint8_t xz_signature[] = {
350 0xfd, '7', 'z', 'X', 'Z', 0x00
352 static const uint8_t gzip_signature[] = {
356 _cleanup_free_ uint8_t *stub = NULL;
363 if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
366 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
367 j->compressed = IMPORT_JOB_XZ;
368 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
369 j->compressed = IMPORT_JOB_GZIP;
371 j->compressed = IMPORT_JOB_UNCOMPRESSED;
373 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
374 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
376 if (j->compressed == IMPORT_JOB_XZ) {
379 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
380 if (xzr != LZMA_OK) {
381 log_error("Failed to initialize XZ decoder.");
385 if (j->compressed == IMPORT_JOB_GZIP) {
386 r = inflateInit2(&j->gzip, 15+16);
388 log_error("Failed to initialize gzip decoder.");
393 r = import_job_open_disk(j);
397 /* Now, take the payload we read so far, and decompress it */
399 stub_size = j->payload_size;
403 j->payload_allocated = 0;
405 j->state = IMPORT_JOB_RUNNING;
407 r = import_job_write_compressed(j, stub, stub_size);
414 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
415 ImportJob *j = userdata;
416 size_t sz = size * nmemb;
424 case IMPORT_JOB_ANALYZING:
425 /* Let's first check what it actually is */
427 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
432 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
433 j->payload_size += sz;
435 r = import_job_detect_compression(j);
441 case IMPORT_JOB_RUNNING:
443 r = import_job_write_compressed(j, contents, sz);
449 case IMPORT_JOB_DONE:
450 case IMPORT_JOB_FAILED:
455 assert_not_reached("Impossible state.");
461 import_job_finish(j, r);
465 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
466 ImportJob *j = userdata;
467 size_t sz = size * nmemb;
468 _cleanup_free_ char *length = NULL, *last_modified = NULL;
475 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
480 assert(j->state == IMPORT_JOB_ANALYZING);
482 r = curl_header_strdup(contents, sz, "ETag:", &etag);
491 if (strv_contains(j->old_etags, j->etag)) {
492 log_info("Image already downloaded. Skipping download.");
493 j->etag_exists = true;
494 import_job_finish(j, 0);
501 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
507 (void) safe_atou64(length, &j->content_length);
509 if (j->content_length != (uint64_t) -1) {
510 char bytes[FORMAT_BYTES_MAX];
512 if (j->content_length > j->compressed_max) {
513 log_error("Content too large.");
518 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
524 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
530 (void) curl_parse_http_time(last_modified, &j->mtime);
537 import_job_finish(j, r);
541 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
542 ImportJob *j = userdata;
551 percent = ((100 * dlnow) / dltotal);
552 n = now(CLOCK_MONOTONIC);
554 if (n > j->last_status_usec + USEC_PER_SEC &&
555 percent != j->progress_percent &&
557 char buf[FORMAT_TIMESPAN_MAX];
559 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
560 char y[FORMAT_BYTES_MAX];
563 done = n - j->start_usec;
564 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
566 log_info("Got %u%% of %s. %s left at %s/s.",
569 format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
570 format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
572 log_info("Got %u%% of %s.", percent, j->url);
574 j->progress_percent = percent;
575 j->last_status_usec = n;
581 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
582 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
588 j = new0(ImportJob, 1);
592 j->state = IMPORT_JOB_INIT;
594 j->userdata = userdata;
596 j->content_length = (uint64_t) -1;
597 j->start_usec = now(CLOCK_MONOTONIC);
598 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
600 j->url = strdup(url);
610 int import_job_begin(ImportJob *j) {
615 if (j->state != IMPORT_JOB_INIT)
618 r = curl_glue_make(&j->curl, j->url, j);
622 if (!strv_isempty(j->old_etags)) {
623 _cleanup_free_ char *cc = NULL, *hdr = NULL;
625 cc = strv_join(j->old_etags, ", ");
629 hdr = strappend("If-None-Match: ", cc);
633 j->request_header = curl_slist_new(hdr, NULL);
634 if (!j->request_header)
637 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
641 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
644 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
647 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
650 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
653 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
656 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
659 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
662 r = curl_glue_add(j->glue, j->curl);
666 j->state = IMPORT_JOB_ANALYZING;