1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2015 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/xattr.h>
25 #include "import-job.h"
27 ImportJob* import_job_unref(ImportJob *j) {
31 curl_glue_remove_and_free(j->glue, j->curl);
32 curl_slist_free_all(j->request_header);
34 safe_close(j->disk_fd);
36 if (j->compressed == IMPORT_JOB_XZ)
38 else if (j->compressed == IMPORT_JOB_GZIP)
41 if (j->checksum_context)
42 gcry_md_close(j->checksum_context);
46 strv_free(j->old_etags);
55 static void import_job_finish(ImportJob *j, int ret) {
58 if (j->state == IMPORT_JOB_DONE ||
59 j->state == IMPORT_JOB_FAILED)
63 j->state = IMPORT_JOB_DONE;
64 log_info("Download of %s complete.", j->url);
66 j->state = IMPORT_JOB_FAILED;
74 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
80 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
83 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
86 if (result != CURLE_OK) {
87 log_error("Transfer failed: %s", curl_easy_strerror(result));
92 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
93 if (code != CURLE_OK) {
94 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
97 } else if (status == 304) {
98 log_info("Image already downloaded. Skipping download.");
99 j->etag_exists = true;
102 } else if (status >= 300) {
103 log_error("HTTP request to %s failed with code %li.", j->url, status);
106 } else if (status < 200) {
107 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
112 if (j->state != IMPORT_JOB_RUNNING) {
113 log_error("Premature connection termination.");
118 if (j->content_length != (uint64_t) -1 &&
119 j->content_length != j->written_compressed) {
120 log_error("Download truncated.");
125 if (j->checksum_context) {
128 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
130 log_error("Failed to get checksum.");
135 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
141 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
144 if (j->disk_fd >= 0 && j->allow_sparse) {
145 /* Make sure the file size is right, in case the file was
146 * sparse and we just seeked for the last part */
148 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
149 log_error_errno(errno, "Failed to truncate file: %m");
155 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
157 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
160 struct timespec ut[2];
162 timespec_store(&ut[0], j->mtime);
164 (void) futimens(j->disk_fd, ut);
166 (void) fd_setcrtime(j->disk_fd, j->mtime);
173 import_job_finish(j, r);
176 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
183 if (j->written_uncompressed + sz < j->written_uncompressed) {
184 log_error("File too large, overflow");
188 if (j->written_uncompressed + sz > j->uncompressed_max) {
189 log_error("File overly large, refusing");
193 if (j->disk_fd >= 0) {
196 n = sparse_write(j->disk_fd, p, sz, 64);
198 n = write(j->disk_fd, p, sz);
200 log_error_errno(errno, "Failed to write file: %m");
203 if ((size_t) n < sz) {
204 log_error("Short write");
209 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
212 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
213 j->payload_size += sz;
216 j->written_uncompressed += sz;
221 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
228 if (j->written_compressed + sz < j->written_compressed) {
229 log_error("File too large, overflow");
233 if (j->written_compressed + sz > j->compressed_max) {
234 log_error("File overly large, refusing.");
238 if (j->content_length != (uint64_t) -1 &&
239 j->written_compressed + sz > j->content_length) {
240 log_error("Content length incorrect.");
244 if (j->checksum_context)
245 gcry_md_write(j->checksum_context, p, sz);
247 switch (j->compressed) {
249 case IMPORT_JOB_UNCOMPRESSED:
250 r = import_job_write_uncompressed(j, p, sz);
260 while (j->xz.avail_in > 0) {
261 uint8_t buffer[16 * 1024];
264 j->xz.next_out = buffer;
265 j->xz.avail_out = sizeof(buffer);
267 lzr = lzma_code(&j->xz, LZMA_RUN);
268 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
269 log_error("Decompression error.");
273 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
280 case IMPORT_JOB_GZIP:
282 j->gzip.avail_in = sz;
284 while (j->gzip.avail_in > 0) {
285 uint8_t buffer[16 * 1024];
287 j->gzip.next_out = buffer;
288 j->gzip.avail_out = sizeof(buffer);
290 r = inflate(&j->gzip, Z_NO_FLUSH);
291 if (r != Z_OK && r != Z_STREAM_END) {
292 log_error("Decompression error.");
296 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
304 assert_not_reached("Unknown compression");
307 j->written_compressed += sz;
312 static int import_job_open_disk(ImportJob *j) {
317 if (j->on_open_disk) {
318 r = j->on_open_disk(j);
323 if (j->disk_fd >= 0) {
324 /* Check if we can do sparse files */
326 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
327 j->allow_sparse = true;
330 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
332 j->allow_sparse = false;
336 if (j->calc_checksum) {
337 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
338 log_error("Failed to initialize hash context.");
346 static int import_job_detect_compression(ImportJob *j) {
347 static const uint8_t xz_signature[] = {
348 0xfd, '7', 'z', 'X', 'Z', 0x00
350 static const uint8_t gzip_signature[] = {
354 _cleanup_free_ uint8_t *stub = NULL;
361 if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
364 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
365 j->compressed = IMPORT_JOB_XZ;
366 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
367 j->compressed = IMPORT_JOB_GZIP;
369 j->compressed = IMPORT_JOB_UNCOMPRESSED;
371 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
372 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
374 if (j->compressed == IMPORT_JOB_XZ) {
377 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
378 if (xzr != LZMA_OK) {
379 log_error("Failed to initialize XZ decoder.");
383 if (j->compressed == IMPORT_JOB_GZIP) {
384 r = inflateInit2(&j->gzip, 15+16);
386 log_error("Failed to initialize gzip decoder.");
391 r = import_job_open_disk(j);
395 /* Now, take the payload we read so far, and decompress it */
397 stub_size = j->payload_size;
401 j->payload_allocated = 0;
403 j->state = IMPORT_JOB_RUNNING;
405 r = import_job_write_compressed(j, stub, stub_size);
412 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
413 ImportJob *j = userdata;
414 size_t sz = size * nmemb;
422 case IMPORT_JOB_ANALYZING:
423 /* Let's first check what it actually is */
425 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
430 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
431 j->payload_size += sz;
433 r = import_job_detect_compression(j);
439 case IMPORT_JOB_RUNNING:
441 r = import_job_write_compressed(j, contents, sz);
447 case IMPORT_JOB_DONE:
448 case IMPORT_JOB_FAILED:
453 assert_not_reached("Impossible state.");
459 import_job_finish(j, r);
463 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
464 ImportJob *j = userdata;
465 size_t sz = size * nmemb;
466 _cleanup_free_ char *length = NULL, *last_modified = NULL;
473 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
478 assert(j->state == IMPORT_JOB_ANALYZING);
480 r = curl_header_strdup(contents, sz, "ETag:", &etag);
489 if (strv_contains(j->old_etags, j->etag)) {
490 log_info("Image already downloaded. Skipping download.");
491 j->etag_exists = true;
492 import_job_finish(j, 0);
499 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
505 (void) safe_atou64(length, &j->content_length);
507 if (j->content_length != (uint64_t) -1) {
508 char bytes[FORMAT_BYTES_MAX];
510 if (j->content_length > j->compressed_max) {
511 log_error("Content too large.");
516 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
522 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
528 (void) curl_parse_http_time(last_modified, &j->mtime);
535 import_job_finish(j, r);
539 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
540 ImportJob *j = userdata;
549 percent = ((100 * dlnow) / dltotal);
550 n = now(CLOCK_MONOTONIC);
552 if (n > j->last_status_usec + USEC_PER_SEC &&
553 percent != j->progress_percent &&
555 char buf[FORMAT_TIMESPAN_MAX];
557 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
558 char y[FORMAT_BYTES_MAX];
561 done = n - j->start_usec;
562 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
564 log_info("Got %u%% of %s. %s left at %s/s.",
567 format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
568 format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
570 log_info("Got %u%% of %s.", percent, j->url);
572 j->progress_percent = percent;
573 j->last_status_usec = n;
579 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
580 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
586 j = new0(ImportJob, 1);
590 j->state = IMPORT_JOB_INIT;
592 j->userdata = userdata;
594 j->content_length = (uint64_t) -1;
595 j->start_usec = now(CLOCK_MONOTONIC);
596 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
598 j->url = strdup(url);
608 int import_job_begin(ImportJob *j) {
613 if (j->state != IMPORT_JOB_INIT)
616 r = curl_glue_make(&j->curl, j->url, j);
620 if (!strv_isempty(j->old_etags)) {
621 _cleanup_free_ char *cc = NULL, *hdr = NULL;
623 cc = strv_join(j->old_etags, ", ");
627 hdr = strappend("If-None-Match: ", cc);
631 j->request_header = curl_slist_new(hdr, NULL);
632 if (!j->request_header)
635 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
639 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
642 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
645 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
648 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
651 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
654 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
657 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
660 r = curl_glue_add(j->glue, j->curl);
664 j->state = IMPORT_JOB_ANALYZING;