1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2015 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/xattr.h>
25 #include "import-job.h"
27 ImportJob* import_job_unref(ImportJob *j) {
31 curl_glue_remove_and_free(j->glue, j->curl);
32 curl_slist_free_all(j->request_header);
34 safe_close(j->disk_fd);
36 if (j->compressed == IMPORT_JOB_XZ)
38 else if (j->compressed == IMPORT_JOB_GZIP)
40 else if (j->compressed == IMPORT_JOB_BZIP2)
41 BZ2_bzDecompressEnd(&j->bzip2);
43 if (j->checksum_context)
44 gcry_md_close(j->checksum_context);
48 strv_free(j->old_etags);
57 static void import_job_finish(ImportJob *j, int ret) {
60 if (j->state == IMPORT_JOB_DONE ||
61 j->state == IMPORT_JOB_FAILED)
65 j->state = IMPORT_JOB_DONE;
66 log_info("Download of %s complete.", j->url);
68 j->state = IMPORT_JOB_FAILED;
76 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
82 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
85 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
88 if (result != CURLE_OK) {
89 log_error("Transfer failed: %s", curl_easy_strerror(result));
94 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
95 if (code != CURLE_OK) {
96 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
99 } else if (status == 304) {
100 log_info("Image already downloaded. Skipping download.");
101 j->etag_exists = true;
104 } else if (status >= 300) {
105 log_error("HTTP request to %s failed with code %li.", j->url, status);
108 } else if (status < 200) {
109 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
114 if (j->state != IMPORT_JOB_RUNNING) {
115 log_error("Premature connection termination.");
120 if (j->content_length != (uint64_t) -1 &&
121 j->content_length != j->written_compressed) {
122 log_error("Download truncated.");
127 if (j->checksum_context) {
130 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
132 log_error("Failed to get checksum.");
137 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
143 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
146 if (j->disk_fd >= 0 && j->allow_sparse) {
147 /* Make sure the file size is right, in case the file was
148 * sparse and we just seeked for the last part */
150 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
151 log_error_errno(errno, "Failed to truncate file: %m");
157 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
159 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
162 struct timespec ut[2];
164 timespec_store(&ut[0], j->mtime);
166 (void) futimens(j->disk_fd, ut);
168 (void) fd_setcrtime(j->disk_fd, j->mtime);
175 import_job_finish(j, r);
178 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
187 if (j->written_uncompressed + sz < j->written_uncompressed) {
188 log_error("File too large, overflow");
192 if (j->written_uncompressed + sz > j->uncompressed_max) {
193 log_error("File overly large, refusing");
197 if (j->disk_fd >= 0) {
200 n = sparse_write(j->disk_fd, p, sz, 64);
202 n = write(j->disk_fd, p, sz);
204 log_error_errno(errno, "Failed to write file: %m");
207 if ((size_t) n < sz) {
208 log_error("Short write");
213 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
216 memcpy(j->payload + j->payload_size, p, sz);
217 j->payload_size += sz;
220 j->written_uncompressed += sz;
225 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
234 if (j->written_compressed + sz < j->written_compressed) {
235 log_error("File too large, overflow");
239 if (j->written_compressed + sz > j->compressed_max) {
240 log_error("File overly large, refusing.");
244 if (j->content_length != (uint64_t) -1 &&
245 j->written_compressed + sz > j->content_length) {
246 log_error("Content length incorrect.");
250 if (j->checksum_context)
251 gcry_md_write(j->checksum_context, p, sz);
253 switch (j->compressed) {
255 case IMPORT_JOB_UNCOMPRESSED:
256 r = import_job_write_uncompressed(j, p, sz);
266 while (j->xz.avail_in > 0) {
267 uint8_t buffer[16 * 1024];
270 j->xz.next_out = buffer;
271 j->xz.avail_out = sizeof(buffer);
273 lzr = lzma_code(&j->xz, LZMA_RUN);
274 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
275 log_error("Decompression error.");
279 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
286 case IMPORT_JOB_GZIP:
288 j->gzip.avail_in = sz;
290 while (j->gzip.avail_in > 0) {
291 uint8_t buffer[16 * 1024];
293 j->gzip.next_out = buffer;
294 j->gzip.avail_out = sizeof(buffer);
296 r = inflate(&j->gzip, Z_NO_FLUSH);
297 if (r != Z_OK && r != Z_STREAM_END) {
298 log_error("Decompression error.");
302 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
309 case IMPORT_JOB_BZIP2:
310 j->bzip2.next_in = p;
311 j->bzip2.avail_in = sz;
313 while (j->bzip2.avail_in > 0) {
314 uint8_t buffer[16 * 1024];
316 j->bzip2.next_out = (char*) buffer;
317 j->bzip2.avail_out = sizeof(buffer);
319 r = BZ2_bzDecompress(&j->bzip2);
320 if (r != BZ_OK && r != BZ_STREAM_END) {
321 log_error("Decompression error.");
325 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->bzip2.avail_out);
333 assert_not_reached("Unknown compression");
336 j->written_compressed += sz;
341 static int import_job_open_disk(ImportJob *j) {
346 if (j->on_open_disk) {
347 r = j->on_open_disk(j);
352 if (j->disk_fd >= 0) {
353 /* Check if we can do sparse files */
355 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
356 j->allow_sparse = true;
359 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
361 j->allow_sparse = false;
365 if (j->calc_checksum) {
366 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
367 log_error("Failed to initialize hash context.");
375 static int import_job_detect_compression(ImportJob *j) {
376 static const uint8_t xz_signature[] = {
377 0xfd, '7', 'z', 'X', 'Z', 0x00
379 static const uint8_t gzip_signature[] = {
382 static const uint8_t bzip2_signature[] = {
386 _cleanup_free_ uint8_t *stub = NULL;
393 if (j->payload_size < MAX3(sizeof(xz_signature),
394 sizeof(gzip_signature),
395 sizeof(bzip2_signature)))
398 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
399 j->compressed = IMPORT_JOB_XZ;
400 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
401 j->compressed = IMPORT_JOB_GZIP;
402 else if (memcmp(j->payload, bzip2_signature, sizeof(bzip2_signature)) == 0)
403 j->compressed = IMPORT_JOB_BZIP2;
405 j->compressed = IMPORT_JOB_UNCOMPRESSED;
407 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
408 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
409 log_debug("Stream is BZIP2 compressed: %s", yes_no(j->compressed == IMPORT_JOB_BZIP2));
411 if (j->compressed == IMPORT_JOB_XZ) {
414 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
415 if (xzr != LZMA_OK) {
416 log_error("Failed to initialize XZ decoder.");
420 if (j->compressed == IMPORT_JOB_GZIP) {
421 r = inflateInit2(&j->gzip, 15+16);
423 log_error("Failed to initialize gzip decoder.");
427 if (j->compressed == IMPORT_JOB_BZIP2) {
428 r = BZ2_bzDecompressInit(&j->bzip2, 0, 0);
430 log_error("Failed to initialize bzip2 decoder.");
435 r = import_job_open_disk(j);
439 /* Now, take the payload we read so far, and decompress it */
441 stub_size = j->payload_size;
445 j->payload_allocated = 0;
447 j->state = IMPORT_JOB_RUNNING;
449 r = import_job_write_compressed(j, stub, stub_size);
456 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
457 ImportJob *j = userdata;
458 size_t sz = size * nmemb;
466 case IMPORT_JOB_ANALYZING:
467 /* Let's first check what it actually is */
469 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
474 memcpy(j->payload + j->payload_size, contents, sz);
475 j->payload_size += sz;
477 r = import_job_detect_compression(j);
483 case IMPORT_JOB_RUNNING:
485 r = import_job_write_compressed(j, contents, sz);
491 case IMPORT_JOB_DONE:
492 case IMPORT_JOB_FAILED:
497 assert_not_reached("Impossible state.");
503 import_job_finish(j, r);
507 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
508 ImportJob *j = userdata;
509 size_t sz = size * nmemb;
510 _cleanup_free_ char *length = NULL, *last_modified = NULL;
517 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
522 assert(j->state == IMPORT_JOB_ANALYZING);
524 r = curl_header_strdup(contents, sz, "ETag:", &etag);
533 if (strv_contains(j->old_etags, j->etag)) {
534 log_info("Image already downloaded. Skipping download.");
535 j->etag_exists = true;
536 import_job_finish(j, 0);
543 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
549 (void) safe_atou64(length, &j->content_length);
551 if (j->content_length != (uint64_t) -1) {
552 char bytes[FORMAT_BYTES_MAX];
554 if (j->content_length > j->compressed_max) {
555 log_error("Content too large.");
560 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
566 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
572 (void) curl_parse_http_time(last_modified, &j->mtime);
577 r = j->on_header(j, contents, sz);
585 import_job_finish(j, r);
589 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
590 ImportJob *j = userdata;
599 percent = ((100 * dlnow) / dltotal);
600 n = now(CLOCK_MONOTONIC);
602 if (n > j->last_status_usec + USEC_PER_SEC &&
603 percent != j->progress_percent &&
605 char buf[FORMAT_TIMESPAN_MAX];
607 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
608 char y[FORMAT_BYTES_MAX];
611 done = n - j->start_usec;
612 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
614 log_info("Got %u%% of %s. %s left at %s/s.",
617 format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
618 format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
620 log_info("Got %u%% of %s.", percent, j->url);
622 j->progress_percent = percent;
623 j->last_status_usec = n;
629 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
630 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
636 j = new0(ImportJob, 1);
640 j->state = IMPORT_JOB_INIT;
642 j->userdata = userdata;
644 j->content_length = (uint64_t) -1;
645 j->start_usec = now(CLOCK_MONOTONIC);
646 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
648 j->url = strdup(url);
658 int import_job_begin(ImportJob *j) {
663 if (j->state != IMPORT_JOB_INIT)
666 r = curl_glue_make(&j->curl, j->url, j);
670 if (!strv_isempty(j->old_etags)) {
671 _cleanup_free_ char *cc = NULL, *hdr = NULL;
673 cc = strv_join(j->old_etags, ", ");
677 hdr = strappend("If-None-Match: ", cc);
681 if (!j->request_header) {
682 j->request_header = curl_slist_new(hdr, NULL);
683 if (!j->request_header)
686 struct curl_slist *l;
688 l = curl_slist_append(j->request_header, hdr);
692 j->request_header = l;
696 if (j->request_header) {
697 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
701 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
704 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
707 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
710 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
713 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
716 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
719 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
722 r = curl_glue_add(j->glue, j->curl);
726 j->state = IMPORT_JOB_ANALYZING;