1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2015 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/xattr.h>
25 #include "import-job.h"
27 ImportJob* import_job_unref(ImportJob *j) {
31 curl_glue_remove_and_free(j->glue, j->curl);
32 curl_slist_free_all(j->request_header);
34 safe_close(j->disk_fd);
36 if (j->compressed == IMPORT_JOB_XZ)
38 else if (j->compressed == IMPORT_JOB_GZIP)
43 strv_free(j->old_etags);
51 DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
53 static void import_job_finish(ImportJob *j, int ret) {
56 if (j->state == IMPORT_JOB_DONE ||
57 j->state == IMPORT_JOB_FAILED)
61 j->state = IMPORT_JOB_DONE;
62 log_info("Download of %s complete.", j->url);
64 j->state = IMPORT_JOB_FAILED;
72 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
78 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
81 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
84 if (result != CURLE_OK) {
85 log_error("Transfer failed: %s", curl_easy_strerror(result));
90 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
91 if (code != CURLE_OK) {
92 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
95 } else if (status == 304) {
96 log_info("Image already downloaded. Skipping download.");
99 } else if (status >= 300) {
100 log_error("HTTP request to %s failed with code %li.", j->url, status);
103 } else if (status < 200) {
104 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
109 if (j->state != IMPORT_JOB_RUNNING) {
110 log_error("Premature connection termination.");
115 if (j->content_length != (uint64_t) -1 &&
116 j->content_length != j->written_compressed) {
117 log_error("Download truncated.");
122 if (j->disk_fd >= 0 && j->allow_sparse) {
123 /* Make sure the file size is right, in case the file was
124 * sparse and we just seeked for the last part */
126 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
127 log_error_errno(errno, "Failed to truncate file: %m");
133 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
135 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
138 struct timespec ut[2];
140 timespec_store(&ut[0], j->mtime);
142 (void) futimens(j->disk_fd, ut);
144 (void) fd_setcrtime(j->disk_fd, j->mtime);
151 import_job_finish(j, r);
155 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
161 assert(j->disk_fd >= 0);
163 if (j->written_uncompressed + sz < j->written_uncompressed) {
164 log_error("File too large, overflow");
168 if (j->written_uncompressed + sz > j->uncompressed_max) {
169 log_error("File overly large, refusing");
173 if (j->disk_fd >= 0) {
176 n = sparse_write(j->disk_fd, p, sz, 64);
178 n = write(j->disk_fd, p, sz);
180 log_error_errno(errno, "Failed to write file: %m");
183 if ((size_t) n < sz) {
184 log_error("Short write");
189 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
192 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
193 j->payload_size += sz;
196 j->written_uncompressed += sz;
201 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
207 assert(j->disk_fd >= 0);
209 if (j->written_compressed + sz < j->written_compressed) {
210 log_error("File too large, overflow");
214 if (j->written_compressed + sz > j->compressed_max) {
215 log_error("File overly large, refusing.");
219 if (j->content_length != (uint64_t) -1 &&
220 j->written_compressed + sz > j->content_length) {
221 log_error("Content length incorrect.");
225 switch (j->compressed) {
227 case IMPORT_JOB_UNCOMPRESSED:
228 r = import_job_write_uncompressed(j, p, sz);
238 while (j->xz.avail_in > 0) {
239 uint8_t buffer[16 * 1024];
242 j->xz.next_out = buffer;
243 j->xz.avail_out = sizeof(buffer);
245 lzr = lzma_code(&j->xz, LZMA_RUN);
246 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
247 log_error("Decompression error.");
251 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
258 case IMPORT_JOB_GZIP:
260 j->gzip.avail_in = sz;
262 while (j->gzip.avail_in > 0) {
263 uint8_t buffer[16 * 1024];
265 j->gzip.next_out = buffer;
266 j->gzip.avail_out = sizeof(buffer);
268 r = inflate(&j->gzip, Z_NO_FLUSH);
269 if (r != Z_OK && r != Z_STREAM_END) {
270 log_error("Decompression error.");
274 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
282 assert_not_reached("Unknown compression");
285 j->written_compressed += sz;
290 static int import_job_open_disk(ImportJob *j) {
295 if (j->on_open_disk) {
296 r = j->on_open_disk(j);
301 if (j->disk_fd >= 0) {
302 /* Check if we can do sparse files */
304 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
305 j->allow_sparse = true;
308 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
310 j->allow_sparse = false;
317 static int import_job_detect_compression(ImportJob *j) {
318 static const uint8_t xz_signature[] = {
319 0xfd, '7', 'z', 'X', 'Z', 0x00
321 static const uint8_t gzip_signature[] = {
325 _cleanup_free_ uint8_t *stub = NULL;
332 if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
335 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
336 j->compressed = IMPORT_JOB_XZ;
337 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
338 j->compressed = IMPORT_JOB_GZIP;
340 j->compressed = IMPORT_JOB_UNCOMPRESSED;
342 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
343 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
345 if (j->compressed == IMPORT_JOB_XZ) {
348 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
349 if (xzr != LZMA_OK) {
350 log_error("Failed to initialize XZ decoder.");
354 if (j->compressed == IMPORT_JOB_GZIP) {
355 r = inflateInit2(&j->gzip, 15+16);
357 log_error("Failed to initialize gzip decoder.");
362 r = import_job_open_disk(j);
366 /* Now, take the payload we read so far, and decompress it */
368 stub_size = j->payload_size;
373 j->state = IMPORT_JOB_RUNNING;
375 r = import_job_write_compressed(j, stub, stub_size);
382 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
383 ImportJob *j = userdata;
384 size_t sz = size * nmemb;
392 case IMPORT_JOB_ANALYZING:
393 /* Let's first check what it actually is */
395 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
400 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
401 j->payload_size += sz;
403 r = import_job_detect_compression(j);
409 case IMPORT_JOB_RUNNING:
411 r = import_job_write_compressed(j, contents, sz);
417 case IMPORT_JOB_DONE:
418 case IMPORT_JOB_FAILED:
423 assert_not_reached("Impossible state.");
429 import_job_finish(j, r);
433 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
434 ImportJob *j = userdata;
435 size_t sz = size * nmemb;
436 _cleanup_free_ char *length = NULL, *last_modified = NULL;
443 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
448 assert(j->state == IMPORT_JOB_ANALYZING);
450 r = curl_header_strdup(contents, sz, "ETag:", &etag);
459 if (strv_contains(j->old_etags, j->etag)) {
460 log_info("Image already downloaded. Skipping download.");
461 import_job_finish(j, 0);
468 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
474 (void) safe_atou64(length, &j->content_length);
476 if (j->content_length != (uint64_t) -1) {
477 char bytes[FORMAT_BYTES_MAX];
479 if (j->content_length > j->compressed_max) {
480 log_error("Content too large.");
485 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
491 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
497 (void) curl_parse_http_time(last_modified, &j->mtime);
504 import_job_finish(j, r);
508 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
509 ImportJob *j = userdata;
518 percent = ((100 * dlnow) / dltotal);
519 n = now(CLOCK_MONOTONIC);
521 if (n > j->last_status_usec + USEC_PER_SEC &&
522 percent != j->progress_percent &&
524 char buf[FORMAT_TIMESPAN_MAX];
526 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
529 done = n - j->start_usec;
530 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
532 log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC));
534 log_info("Got %u%% of %s.", percent, j->url);
536 j->progress_percent = percent;
537 j->last_status_usec = n;
543 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
544 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
550 j = new0(ImportJob, 1);
554 j->state = IMPORT_JOB_INIT;
556 j->userdata = userdata;
558 j->content_length = (uint64_t) -1;
559 j->start_usec = now(CLOCK_MONOTONIC);
560 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
562 j->url = strdup(url);
572 int import_job_begin(ImportJob *j) {
577 if (j->state != IMPORT_JOB_INIT)
580 r = curl_glue_make(&j->curl, j->url, j);
584 if (!strv_isempty(j->old_etags)) {
585 _cleanup_free_ char *cc = NULL, *hdr = NULL;
587 cc = strv_join(j->old_etags, ", ");
591 hdr = strappend("If-None-Match: ", cc);
595 j->request_header = curl_slist_new(hdr, NULL);
596 if (!j->request_header)
599 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
603 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
606 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
609 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
612 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
615 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
618 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
621 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
624 r = curl_glue_add(j->glue, j->curl);
628 j->state = IMPORT_JOB_ANALYZING;