1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2015 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/xattr.h>
25 #include "import-job.h"
27 ImportJob* import_job_unref(ImportJob *j) {
31 curl_glue_remove_and_free(j->glue, j->curl);
32 curl_slist_free_all(j->request_header);
34 safe_close(j->disk_fd);
36 if (j->compressed == IMPORT_JOB_XZ)
38 else if (j->compressed == IMPORT_JOB_GZIP)
43 strv_free(j->old_etags);
51 DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
53 static void import_job_finish(ImportJob *j, int ret) {
56 if (j->state == IMPORT_JOB_DONE ||
57 j->state == IMPORT_JOB_FAILED)
61 j->state = IMPORT_JOB_DONE;
63 j->state = IMPORT_JOB_FAILED;
71 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
77 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
80 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
83 if (result != CURLE_OK) {
84 log_error("Transfer failed: %s", curl_easy_strerror(result));
89 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
90 if (code != CURLE_OK) {
91 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
94 } else if (status == 304) {
95 log_info("Image already downloaded. Skipping download.");
98 } else if (status >= 300) {
99 log_error("HTTP request to %s failed with code %li.", j->url, status);
102 } else if (status < 200) {
103 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
108 if (j->state != IMPORT_JOB_RUNNING) {
109 log_error("Premature connection termination.");
114 if (j->content_length != (uint64_t) -1 &&
115 j->content_length != j->written_compressed) {
116 log_error("Download truncated.");
121 if (j->disk_fd >= 0 && j->allow_sparse) {
122 /* Make sure the file size is right, in case the file was
123 * sparse and we just seeked for the last part */
125 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
126 log_error_errno(errno, "Failed to truncate file: %m");
132 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
134 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
137 struct timespec ut[2];
139 timespec_store(&ut[0], j->mtime);
141 (void) futimens(j->disk_fd, ut);
143 (void) fd_setcrtime(j->disk_fd, j->mtime);
150 import_job_finish(j, r);
154 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
160 assert(j->disk_fd >= 0);
162 if (j->written_uncompressed + sz < j->written_uncompressed) {
163 log_error("File too large, overflow");
167 if (j->written_uncompressed + sz > j->uncompressed_max) {
168 log_error("File overly large, refusing");
172 if (j->disk_fd >= 0) {
175 n = sparse_write(j->disk_fd, p, sz, 64);
177 n = write(j->disk_fd, p, sz);
179 log_error_errno(errno, "Failed to write file: %m");
182 if ((size_t) n < sz) {
183 log_error("Short write");
188 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
191 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
192 j->payload_size += sz;
195 j->written_uncompressed += sz;
200 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
206 assert(j->disk_fd >= 0);
208 if (j->written_compressed + sz < j->written_compressed) {
209 log_error("File too large, overflow");
213 if (j->written_compressed + sz > j->compressed_max) {
214 log_error("File overly large, refusing.");
218 if (j->content_length != (uint64_t) -1 &&
219 j->written_compressed + sz > j->content_length) {
220 log_error("Content length incorrect.");
224 switch (j->compressed) {
226 case IMPORT_JOB_UNCOMPRESSED:
227 r = import_job_write_uncompressed(j, p, sz);
237 while (j->xz.avail_in > 0) {
238 uint8_t buffer[16 * 1024];
241 j->xz.next_out = buffer;
242 j->xz.avail_out = sizeof(buffer);
244 lzr = lzma_code(&j->xz, LZMA_RUN);
245 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
246 log_error("Decompression error.");
250 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
257 case IMPORT_JOB_GZIP:
259 j->gzip.avail_in = sz;
261 while (j->gzip.avail_in > 0) {
262 uint8_t buffer[16 * 1024];
264 j->gzip.next_out = buffer;
265 j->gzip.avail_out = sizeof(buffer);
267 r = inflate(&j->gzip, Z_NO_FLUSH);
268 if (r != Z_OK && r != Z_STREAM_END) {
269 log_error("Decompression error.");
273 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
281 assert_not_reached("Unknown compression");
284 j->written_compressed += sz;
289 static int import_job_open_disk(ImportJob *j) {
294 if (j->on_open_disk) {
295 r = j->on_open_disk(j);
300 if (j->disk_fd >= 0) {
301 /* Check if we can do sparse files */
303 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
304 j->allow_sparse = true;
307 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
309 j->allow_sparse = false;
316 static int import_job_detect_compression(ImportJob *j) {
317 static const uint8_t xz_signature[] = {
318 0xfd, '7', 'z', 'X', 'Z', 0x00
320 static const uint8_t gzip_signature[] = {
324 _cleanup_free_ uint8_t *stub = NULL;
331 if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
334 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
335 j->compressed = IMPORT_JOB_XZ;
336 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
337 j->compressed = IMPORT_JOB_GZIP;
339 j->compressed = IMPORT_JOB_UNCOMPRESSED;
341 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
342 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
344 if (j->compressed == IMPORT_JOB_XZ) {
347 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
348 if (xzr != LZMA_OK) {
349 log_error("Failed to initialize XZ decoder.");
353 if (j->compressed == IMPORT_JOB_GZIP) {
354 r = inflateInit2(&j->gzip, 15+16);
356 log_error("Failed to initialize gzip decoder.");
361 r = import_job_open_disk(j);
365 /* Now, take the payload we read so far, and decompress it */
367 stub_size = j->payload_size;
372 j->state = IMPORT_JOB_RUNNING;
374 r = import_job_write_compressed(j, stub, stub_size);
381 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
382 ImportJob *j = userdata;
383 size_t sz = size * nmemb;
391 case IMPORT_JOB_ANALYZING:
392 /* Let's first check what it actually is */
394 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
399 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
400 j->payload_size += sz;
402 r = import_job_detect_compression(j);
408 case IMPORT_JOB_RUNNING:
410 r = import_job_write_compressed(j, contents, sz);
416 case IMPORT_JOB_DONE:
417 case IMPORT_JOB_FAILED:
422 assert_not_reached("Impossible state.");
428 import_job_finish(j, r);
432 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
433 ImportJob *j = userdata;
434 size_t sz = size * nmemb;
435 _cleanup_free_ char *length = NULL, *last_modified = NULL;
442 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
447 assert(j->state == IMPORT_JOB_ANALYZING);
449 r = curl_header_strdup(contents, sz, "ETag:", &etag);
458 if (strv_contains(j->old_etags, j->etag)) {
459 log_info("Image already downloaded. Skipping download.");
460 import_job_finish(j, 0);
467 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
473 (void) safe_atou64(length, &j->content_length);
475 if (j->content_length != (uint64_t) -1) {
476 char bytes[FORMAT_BYTES_MAX];
478 if (j->content_length > j->compressed_max) {
479 log_error("Content too large.");
484 log_info("Downloading %s.", format_bytes(bytes, sizeof(bytes), j->content_length));
490 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
496 (void) curl_parse_http_time(last_modified, &j->mtime);
503 import_job_finish(j, r);
507 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
508 ImportJob *j = userdata;
517 percent = ((100 * dlnow) / dltotal);
518 n = now(CLOCK_MONOTONIC);
520 if (n > j->last_status_usec + USEC_PER_SEC &&
521 percent != j->progress_percent) {
522 char buf[FORMAT_TIMESPAN_MAX];
524 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
527 done = n - j->start_usec;
528 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
530 log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC));
532 log_info("Got %u%% of %s.", percent, j->url);
534 j->progress_percent = percent;
535 j->last_status_usec = n;
541 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
542 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
548 j = new0(ImportJob, 1);
552 j->state = IMPORT_JOB_INIT;
554 j->userdata = userdata;
556 j->content_length = (uint64_t) -1;
557 j->start_usec = now(CLOCK_MONOTONIC);
558 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
560 j->url = strdup(url);
570 int import_job_begin(ImportJob *j) {
575 if (j->state != IMPORT_JOB_INIT)
578 r = curl_glue_make(&j->curl, j->url, j);
582 if (!strv_isempty(j->old_etags)) {
583 _cleanup_free_ char *cc = NULL, *hdr = NULL;
585 cc = strv_join(j->old_etags, ", ");
589 hdr = strappend("If-None-Match: ", cc);
593 j->request_header = curl_slist_new(hdr, NULL);
594 if (!j->request_header)
597 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
601 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
604 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
607 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
610 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
613 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
616 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
619 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
622 r = curl_glue_add(j->glue, j->curl);
626 j->state = IMPORT_JOB_ANALYZING;