1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2015 Lennart Poettering
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <sys/xattr.h>
25 #include "import-job.h"
27 ImportJob* import_job_unref(ImportJob *j) {
31 curl_glue_remove_and_free(j->glue, j->curl);
32 curl_slist_free_all(j->request_header);
34 safe_close(j->disk_fd);
36 if (j->compressed == IMPORT_JOB_XZ)
38 else if (j->compressed == IMPORT_JOB_GZIP)
43 strv_free(j->old_etags);
51 DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
53 static void import_job_finish(ImportJob *j, int ret) {
56 if (j->state == IMPORT_JOB_DONE ||
57 j->state == IMPORT_JOB_FAILED)
61 j->state = IMPORT_JOB_DONE;
62 log_info("Download of %s complete.", j->url);
64 j->state = IMPORT_JOB_FAILED;
72 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
78 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
81 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
84 if (result != CURLE_OK) {
85 log_error("Transfer failed: %s", curl_easy_strerror(result));
90 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
91 if (code != CURLE_OK) {
92 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
95 } else if (status == 304) {
96 log_info("Image already downloaded. Skipping download.");
99 } else if (status >= 300) {
100 log_error("HTTP request to %s failed with code %li.", j->url, status);
103 } else if (status < 200) {
104 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
109 if (j->state != IMPORT_JOB_RUNNING) {
110 log_error("Premature connection termination.");
115 if (j->content_length != (uint64_t) -1 &&
116 j->content_length != j->written_compressed) {
117 log_error("Download truncated.");
122 if (j->disk_fd >= 0 && j->allow_sparse) {
123 /* Make sure the file size is right, in case the file was
124 * sparse and we just seeked for the last part */
126 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
127 log_error_errno(errno, "Failed to truncate file: %m");
133 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
135 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
138 struct timespec ut[2];
140 timespec_store(&ut[0], j->mtime);
142 (void) futimens(j->disk_fd, ut);
144 (void) fd_setcrtime(j->disk_fd, j->mtime);
151 import_job_finish(j, r);
155 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
161 assert(j->disk_fd >= 0);
163 if (j->written_uncompressed + sz < j->written_uncompressed) {
164 log_error("File too large, overflow");
168 if (j->written_uncompressed + sz > j->uncompressed_max) {
169 log_error("File overly large, refusing");
173 if (j->disk_fd >= 0) {
176 n = sparse_write(j->disk_fd, p, sz, 64);
178 n = write(j->disk_fd, p, sz);
180 log_error_errno(errno, "Failed to write file: %m");
183 if ((size_t) n < sz) {
184 log_error("Short write");
189 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
192 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
193 j->payload_size += sz;
196 j->written_uncompressed += sz;
201 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
207 assert(j->disk_fd >= 0);
209 if (j->written_compressed + sz < j->written_compressed) {
210 log_error("File too large, overflow");
214 if (j->written_compressed + sz > j->compressed_max) {
215 log_error("File overly large, refusing.");
219 if (j->content_length != (uint64_t) -1 &&
220 j->written_compressed + sz > j->content_length) {
221 log_error("Content length incorrect.");
225 switch (j->compressed) {
227 case IMPORT_JOB_UNCOMPRESSED:
228 r = import_job_write_uncompressed(j, p, sz);
238 while (j->xz.avail_in > 0) {
239 uint8_t buffer[16 * 1024];
242 j->xz.next_out = buffer;
243 j->xz.avail_out = sizeof(buffer);
245 lzr = lzma_code(&j->xz, LZMA_RUN);
246 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
247 log_error("Decompression error.");
251 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
258 case IMPORT_JOB_GZIP:
260 j->gzip.avail_in = sz;
262 while (j->gzip.avail_in > 0) {
263 uint8_t buffer[16 * 1024];
265 j->gzip.next_out = buffer;
266 j->gzip.avail_out = sizeof(buffer);
268 r = inflate(&j->gzip, Z_NO_FLUSH);
269 if (r != Z_OK && r != Z_STREAM_END) {
270 log_error("Decompression error.");
274 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
282 assert_not_reached("Unknown compression");
285 j->written_compressed += sz;
290 static int import_job_open_disk(ImportJob *j) {
295 if (j->on_open_disk) {
296 r = j->on_open_disk(j);
301 if (j->disk_fd >= 0) {
302 /* Check if we can do sparse files */
304 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
305 j->allow_sparse = true;
308 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
310 j->allow_sparse = false;
317 static int import_job_detect_compression(ImportJob *j) {
318 static const uint8_t xz_signature[] = {
319 0xfd, '7', 'z', 'X', 'Z', 0x00
321 static const uint8_t gzip_signature[] = {
325 _cleanup_free_ uint8_t *stub = NULL;
332 if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
335 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
336 j->compressed = IMPORT_JOB_XZ;
337 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
338 j->compressed = IMPORT_JOB_GZIP;
340 j->compressed = IMPORT_JOB_UNCOMPRESSED;
342 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
343 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
345 if (j->compressed == IMPORT_JOB_XZ) {
348 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
349 if (xzr != LZMA_OK) {
350 log_error("Failed to initialize XZ decoder.");
354 if (j->compressed == IMPORT_JOB_GZIP) {
355 r = inflateInit2(&j->gzip, 15+16);
357 log_error("Failed to initialize gzip decoder.");
362 r = import_job_open_disk(j);
366 /* Now, take the payload we read so far, and decompress it */
368 stub_size = j->payload_size;
372 j->payload_allocated = 0;
374 j->state = IMPORT_JOB_RUNNING;
376 r = import_job_write_compressed(j, stub, stub_size);
383 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
384 ImportJob *j = userdata;
385 size_t sz = size * nmemb;
393 case IMPORT_JOB_ANALYZING:
394 /* Let's first check what it actually is */
396 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
401 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
402 j->payload_size += sz;
404 r = import_job_detect_compression(j);
410 case IMPORT_JOB_RUNNING:
412 r = import_job_write_compressed(j, contents, sz);
418 case IMPORT_JOB_DONE:
419 case IMPORT_JOB_FAILED:
424 assert_not_reached("Impossible state.");
430 import_job_finish(j, r);
434 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
435 ImportJob *j = userdata;
436 size_t sz = size * nmemb;
437 _cleanup_free_ char *length = NULL, *last_modified = NULL;
444 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
449 assert(j->state == IMPORT_JOB_ANALYZING);
451 r = curl_header_strdup(contents, sz, "ETag:", &etag);
460 if (strv_contains(j->old_etags, j->etag)) {
461 log_info("Image already downloaded. Skipping download.");
462 import_job_finish(j, 0);
469 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
475 (void) safe_atou64(length, &j->content_length);
477 if (j->content_length != (uint64_t) -1) {
478 char bytes[FORMAT_BYTES_MAX];
480 if (j->content_length > j->compressed_max) {
481 log_error("Content too large.");
486 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
492 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
498 (void) curl_parse_http_time(last_modified, &j->mtime);
505 import_job_finish(j, r);
509 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
510 ImportJob *j = userdata;
519 percent = ((100 * dlnow) / dltotal);
520 n = now(CLOCK_MONOTONIC);
522 if (n > j->last_status_usec + USEC_PER_SEC &&
523 percent != j->progress_percent &&
525 char buf[FORMAT_TIMESPAN_MAX];
527 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
530 done = n - j->start_usec;
531 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
533 log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC));
535 log_info("Got %u%% of %s.", percent, j->url);
537 j->progress_percent = percent;
538 j->last_status_usec = n;
544 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
545 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
551 j = new0(ImportJob, 1);
555 j->state = IMPORT_JOB_INIT;
557 j->userdata = userdata;
559 j->content_length = (uint64_t) -1;
560 j->start_usec = now(CLOCK_MONOTONIC);
561 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
563 j->url = strdup(url);
573 int import_job_begin(ImportJob *j) {
578 if (j->state != IMPORT_JOB_INIT)
581 r = curl_glue_make(&j->curl, j->url, j);
585 if (!strv_isempty(j->old_etags)) {
586 _cleanup_free_ char *cc = NULL, *hdr = NULL;
588 cc = strv_join(j->old_etags, ", ");
592 hdr = strappend("If-None-Match: ", cc);
596 j->request_header = curl_slist_new(hdr, NULL);
597 if (!j->request_header)
600 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
604 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
607 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
610 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
613 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
616 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
619 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
622 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
625 r = curl_glue_add(j->glue, j->curl);
629 j->state = IMPORT_JOB_ANALYZING;