chiark / gitweb /
980b639b5db8d219d70ccda35af0e0d010b574a6
[elogind.git] / src / import / import-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "machine-pool.h"
26 #include "import-job.h"
27
28 /* Grow the /var/lib/machines directory after each 10MiB written */
29 #define IMPORT_GROW_INTERVAL_BYTES (UINT64_C(10) * UINT64_C(1024) * UINT64_C(1024))
30
31 ImportJob* import_job_unref(ImportJob *j) {
32         if (!j)
33                 return NULL;
34
35         curl_glue_remove_and_free(j->glue, j->curl);
36         curl_slist_free_all(j->request_header);
37
38         safe_close(j->disk_fd);
39
40         if (j->compressed == IMPORT_JOB_XZ)
41                 lzma_end(&j->xz);
42         else if (j->compressed == IMPORT_JOB_GZIP)
43                 inflateEnd(&j->gzip);
44         else if (j->compressed == IMPORT_JOB_BZIP2)
45                 BZ2_bzDecompressEnd(&j->bzip2);
46
47         if (j->checksum_context)
48                 gcry_md_close(j->checksum_context);
49
50         free(j->url);
51         free(j->etag);
52         strv_free(j->old_etags);
53         free(j->payload);
54         free(j->checksum);
55
56         free(j);
57
58         return NULL;
59 }
60
61 static void import_job_finish(ImportJob *j, int ret) {
62         assert(j);
63
64         if (j->state == IMPORT_JOB_DONE ||
65             j->state == IMPORT_JOB_FAILED)
66                 return;
67
68         if (ret == 0) {
69                 j->state = IMPORT_JOB_DONE;
70                 j->progress_percent = 100;
71                 log_info("Download of %s complete.", j->url);
72         } else {
73                 j->state = IMPORT_JOB_FAILED;
74                 j->error = ret;
75         }
76
77         if (j->on_finished)
78                 j->on_finished(j);
79 }
80
81 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
82         ImportJob *j = NULL;
83         CURLcode code;
84         long status;
85         int r;
86
87         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
88                 return;
89
90         if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
91                 return;
92
93         if (result != CURLE_OK) {
94                 log_error("Transfer failed: %s", curl_easy_strerror(result));
95                 r = -EIO;
96                 goto finish;
97         }
98
99         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
100         if (code != CURLE_OK) {
101                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
102                 r = -EIO;
103                 goto finish;
104         } else if (status == 304) {
105                 log_info("Image already downloaded. Skipping download.");
106                 j->etag_exists = true;
107                 r = 0;
108                 goto finish;
109         } else if (status >= 300) {
110                 log_error("HTTP request to %s failed with code %li.", j->url, status);
111                 r = -EIO;
112                 goto finish;
113         } else if (status < 200) {
114                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
115                 r = -EIO;
116                 goto finish;
117         }
118
119         if (j->state != IMPORT_JOB_RUNNING) {
120                 log_error("Premature connection termination.");
121                 r = -EIO;
122                 goto finish;
123         }
124
125         if (j->content_length != (uint64_t) -1 &&
126             j->content_length != j->written_compressed) {
127                 log_error("Download truncated.");
128                 r = -EIO;
129                 goto finish;
130         }
131
132         if (j->checksum_context) {
133                 uint8_t *k;
134
135                 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
136                 if (!k) {
137                         log_error("Failed to get checksum.");
138                         r = -EIO;
139                         goto finish;
140                 }
141
142                 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
143                 if (!j->checksum) {
144                         r = log_oom();
145                         goto finish;
146                 }
147
148                 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
149         }
150
151         if (j->disk_fd >= 0 && j->allow_sparse) {
152                 /* Make sure the file size is right, in case the file was
153                  * sparse and we just seeked for the last part */
154
155                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
156                         log_error_errno(errno, "Failed to truncate file: %m");
157                         r = -errno;
158                         goto finish;
159                 }
160
161                 if (j->etag)
162                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
163                 if (j->url)
164                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
165
166                 if (j->mtime != 0) {
167                         struct timespec ut[2];
168
169                         timespec_store(&ut[0], j->mtime);
170                         ut[1] = ut[0];
171                         (void) futimens(j->disk_fd, ut);
172
173                         (void) fd_setcrtime(j->disk_fd, j->mtime);
174                 }
175         }
176
177         r = 0;
178
179 finish:
180         import_job_finish(j, r);
181 }
182
183 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
184         ssize_t n;
185
186         assert(j);
187         assert(p);
188
189         if (sz <= 0)
190                 return 0;
191
192         if (j->written_uncompressed + sz < j->written_uncompressed) {
193                 log_error("File too large, overflow");
194                 return -EOVERFLOW;
195         }
196
197         if (j->written_uncompressed + sz > j->uncompressed_max) {
198                 log_error("File overly large, refusing");
199                 return -EFBIG;
200         }
201
202         if (j->disk_fd >= 0) {
203
204                 if (j->grow_machine_directory && j->written_since_last_grow >= IMPORT_GROW_INTERVAL_BYTES) {
205                         j->written_since_last_grow = 0;
206                         grow_machine_directory();
207                 }
208
209                 if (j->allow_sparse)
210                         n = sparse_write(j->disk_fd, p, sz, 64);
211                 else
212                         n = write(j->disk_fd, p, sz);
213                 if (n < 0) {
214                         log_error_errno(errno, "Failed to write file: %m");
215                         return -errno;
216                 }
217                 if ((size_t) n < sz) {
218                         log_error("Short write");
219                         return -EIO;
220                 }
221         } else {
222
223                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
224                         return log_oom();
225
226                 memcpy(j->payload + j->payload_size, p, sz);
227                 j->payload_size += sz;
228         }
229
230         j->written_uncompressed += sz;
231         j->written_since_last_grow += sz;
232
233         return 0;
234 }
235
236 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
237         int r;
238
239         assert(j);
240         assert(p);
241
242         if (sz <= 0)
243                 return 0;
244
245         if (j->written_compressed + sz < j->written_compressed) {
246                 log_error("File too large, overflow");
247                 return -EOVERFLOW;
248         }
249
250         if (j->written_compressed + sz > j->compressed_max) {
251                 log_error("File overly large, refusing.");
252                 return -EFBIG;
253         }
254
255         if (j->content_length != (uint64_t) -1 &&
256             j->written_compressed + sz > j->content_length) {
257                 log_error("Content length incorrect.");
258                 return -EFBIG;
259         }
260
261         if (j->checksum_context)
262                 gcry_md_write(j->checksum_context, p, sz);
263
264         switch (j->compressed) {
265
266         case IMPORT_JOB_UNCOMPRESSED:
267                 r = import_job_write_uncompressed(j, p, sz);
268                 if (r < 0)
269                         return r;
270
271                 break;
272
273         case IMPORT_JOB_XZ:
274                 j->xz.next_in = p;
275                 j->xz.avail_in = sz;
276
277                 while (j->xz.avail_in > 0) {
278                         uint8_t buffer[16 * 1024];
279                         lzma_ret lzr;
280
281                         j->xz.next_out = buffer;
282                         j->xz.avail_out = sizeof(buffer);
283
284                         lzr = lzma_code(&j->xz, LZMA_RUN);
285                         if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
286                                 log_error("Decompression error.");
287                                 return -EIO;
288                         }
289
290                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
291                         if (r < 0)
292                                 return r;
293                 }
294
295                 break;
296
297         case IMPORT_JOB_GZIP:
298                 j->gzip.next_in = p;
299                 j->gzip.avail_in = sz;
300
301                 while (j->gzip.avail_in > 0) {
302                         uint8_t buffer[16 * 1024];
303
304                         j->gzip.next_out = buffer;
305                         j->gzip.avail_out = sizeof(buffer);
306
307                         r = inflate(&j->gzip, Z_NO_FLUSH);
308                         if (r != Z_OK && r != Z_STREAM_END) {
309                                 log_error("Decompression error.");
310                                 return -EIO;
311                         }
312
313                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
314                         if (r < 0)
315                                 return r;
316                 }
317
318                 break;
319
320         case IMPORT_JOB_BZIP2:
321                 j->bzip2.next_in = p;
322                 j->bzip2.avail_in = sz;
323
324                 while (j->bzip2.avail_in > 0) {
325                         uint8_t buffer[16 * 1024];
326
327                         j->bzip2.next_out = (char*) buffer;
328                         j->bzip2.avail_out = sizeof(buffer);
329
330                         r = BZ2_bzDecompress(&j->bzip2);
331                         if (r != BZ_OK && r != BZ_STREAM_END) {
332                                 log_error("Decompression error.");
333                                 return -EIO;
334                         }
335
336                         r = import_job_write_uncompressed(j,  buffer, sizeof(buffer) - j->bzip2.avail_out);
337                         if (r < 0)
338                                 return r;
339                 }
340
341                 break;
342
343         default:
344                 assert_not_reached("Unknown compression");
345         }
346
347         j->written_compressed += sz;
348
349         return 0;
350 }
351
352 static int import_job_open_disk(ImportJob *j) {
353         int r;
354
355         assert(j);
356
357         if (j->on_open_disk) {
358                 r = j->on_open_disk(j);
359                 if (r < 0)
360                         return r;
361         }
362
363         if (j->disk_fd >= 0) {
364                 /* Check if we can do sparse files */
365
366                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
367                         j->allow_sparse = true;
368                 else {
369                         if (errno != ESPIPE)
370                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
371
372                         j->allow_sparse = false;
373                 }
374         }
375
376         if (j->calc_checksum) {
377                 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
378                         log_error("Failed to initialize hash context.");
379                         return -EIO;
380                 }
381         }
382
383         return 0;
384 }
385
386 static int import_job_detect_compression(ImportJob *j) {
387         static const uint8_t xz_signature[] = {
388                 0xfd, '7', 'z', 'X', 'Z', 0x00
389         };
390         static const uint8_t gzip_signature[] = {
391                 0x1f, 0x8b
392         };
393         static const uint8_t bzip2_signature[] = {
394                 'B', 'Z', 'h'
395         };
396
397         _cleanup_free_ uint8_t *stub = NULL;
398         size_t stub_size;
399
400         int r;
401
402         assert(j);
403
404         if (j->payload_size < MAX3(sizeof(xz_signature),
405                                    sizeof(gzip_signature),
406                                    sizeof(bzip2_signature)))
407                 return 0;
408
409         if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
410                 j->compressed = IMPORT_JOB_XZ;
411         else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
412                 j->compressed = IMPORT_JOB_GZIP;
413         else if (memcmp(j->payload, bzip2_signature, sizeof(bzip2_signature)) == 0)
414                 j->compressed = IMPORT_JOB_BZIP2;
415         else
416                 j->compressed = IMPORT_JOB_UNCOMPRESSED;
417
418         log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
419         log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
420         log_debug("Stream is BZIP2 compressed: %s", yes_no(j->compressed == IMPORT_JOB_BZIP2));
421
422         if (j->compressed == IMPORT_JOB_XZ) {
423                 lzma_ret xzr;
424
425                 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
426                 if (xzr != LZMA_OK) {
427                         log_error("Failed to initialize XZ decoder.");
428                         return -EIO;
429                 }
430         }
431         if (j->compressed == IMPORT_JOB_GZIP) {
432                 r = inflateInit2(&j->gzip, 15+16);
433                 if (r != Z_OK) {
434                         log_error("Failed to initialize gzip decoder.");
435                         return -EIO;
436                 }
437         }
438         if (j->compressed == IMPORT_JOB_BZIP2) {
439                 r = BZ2_bzDecompressInit(&j->bzip2, 0, 0);
440                 if (r != BZ_OK) {
441                         log_error("Failed to initialize bzip2 decoder.");
442                         return -EIO;
443                 }
444         }
445
446         r = import_job_open_disk(j);
447         if (r < 0)
448                 return r;
449
450         /* Now, take the payload we read so far, and decompress it */
451         stub = j->payload;
452         stub_size = j->payload_size;
453
454         j->payload = NULL;
455         j->payload_size = 0;
456         j->payload_allocated = 0;
457
458         j->state = IMPORT_JOB_RUNNING;
459
460         r = import_job_write_compressed(j, stub, stub_size);
461         if (r < 0)
462                 return r;
463
464         return 0;
465 }
466
467 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
468         ImportJob *j = userdata;
469         size_t sz = size * nmemb;
470         int r;
471
472         assert(contents);
473         assert(j);
474
475         switch (j->state) {
476
477         case IMPORT_JOB_ANALYZING:
478                 /* Let's first check what it actually is */
479
480                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
481                         r = log_oom();
482                         goto fail;
483                 }
484
485                 memcpy(j->payload + j->payload_size, contents, sz);
486                 j->payload_size += sz;
487
488                 r = import_job_detect_compression(j);
489                 if (r < 0)
490                         goto fail;
491
492                 break;
493
494         case IMPORT_JOB_RUNNING:
495
496                 r = import_job_write_compressed(j, contents, sz);
497                 if (r < 0)
498                         goto fail;
499
500                 break;
501
502         case IMPORT_JOB_DONE:
503         case IMPORT_JOB_FAILED:
504                 r = -ESTALE;
505                 goto fail;
506
507         default:
508                 assert_not_reached("Impossible state.");
509         }
510
511         return sz;
512
513 fail:
514         import_job_finish(j, r);
515         return 0;
516 }
517
518 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
519         ImportJob *j = userdata;
520         size_t sz = size * nmemb;
521         _cleanup_free_ char *length = NULL, *last_modified = NULL;
522         char *etag;
523         int r;
524
525         assert(contents);
526         assert(j);
527
528         if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
529                 r = -ESTALE;
530                 goto fail;
531         }
532
533         assert(j->state == IMPORT_JOB_ANALYZING);
534
535         r = curl_header_strdup(contents, sz, "ETag:", &etag);
536         if (r < 0) {
537                 log_oom();
538                 goto fail;
539         }
540         if (r > 0) {
541                 free(j->etag);
542                 j->etag = etag;
543
544                 if (strv_contains(j->old_etags, j->etag)) {
545                         log_info("Image already downloaded. Skipping download.");
546                         j->etag_exists = true;
547                         import_job_finish(j, 0);
548                         return sz;
549                 }
550
551                 return sz;
552         }
553
554         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
555         if (r < 0) {
556                 log_oom();
557                 goto fail;
558         }
559         if (r > 0) {
560                 (void) safe_atou64(length, &j->content_length);
561
562                 if (j->content_length != (uint64_t) -1) {
563                         char bytes[FORMAT_BYTES_MAX];
564
565                         if (j->content_length > j->compressed_max) {
566                                 log_error("Content too large.");
567                                 r = -EFBIG;
568                                 goto fail;
569                         }
570
571                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
572                 }
573
574                 return sz;
575         }
576
577         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
578         if (r < 0) {
579                 log_oom();
580                 goto fail;
581         }
582         if (r > 0) {
583                 (void) curl_parse_http_time(last_modified, &j->mtime);
584                 return sz;
585         }
586
587         if (j->on_header) {
588                 r = j->on_header(j, contents, sz);
589                 if (r < 0)
590                         goto fail;
591         }
592
593         return sz;
594
595 fail:
596         import_job_finish(j, r);
597         return 0;
598 }
599
600 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
601         ImportJob *j = userdata;
602         unsigned percent;
603         usec_t n;
604
605         assert(j);
606
607         if (dltotal <= 0)
608                 return 0;
609
610         percent = ((100 * dlnow) / dltotal);
611         n = now(CLOCK_MONOTONIC);
612
613         if (n > j->last_status_usec + USEC_PER_SEC &&
614             percent != j->progress_percent &&
615             dlnow < dltotal) {
616                 char buf[FORMAT_TIMESPAN_MAX];
617
618                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
619                         char y[FORMAT_BYTES_MAX];
620                         usec_t left, done;
621
622                         done = n - j->start_usec;
623                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
624
625                         log_info("Got %u%% of %s. %s left at %s/s.",
626                                  percent,
627                                  j->url,
628                                  format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
629                                  format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
630                 } else
631                         log_info("Got %u%% of %s.", percent, j->url);
632
633                 j->progress_percent = percent;
634                 j->last_status_usec = n;
635
636                 if (j->on_progress)
637                         j->on_progress(j);
638         }
639
640         return 0;
641 }
642
643 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
644         _cleanup_(import_job_unrefp) ImportJob *j = NULL;
645
646         assert(url);
647         assert(glue);
648         assert(ret);
649
650         j = new0(ImportJob, 1);
651         if (!j)
652                 return -ENOMEM;
653
654         j->state = IMPORT_JOB_INIT;
655         j->disk_fd = -1;
656         j->userdata = userdata;
657         j->glue = glue;
658         j->content_length = (uint64_t) -1;
659         j->start_usec = now(CLOCK_MONOTONIC);
660         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
661
662         j->url = strdup(url);
663         if (!j->url)
664                 return -ENOMEM;
665
666         *ret = j;
667         j = NULL;
668
669         return 0;
670 }
671
672 int import_job_begin(ImportJob *j) {
673         int r;
674
675         assert(j);
676
677         if (j->state != IMPORT_JOB_INIT)
678                 return -EBUSY;
679
680         if (j->grow_machine_directory)
681                 grow_machine_directory();
682
683         r = curl_glue_make(&j->curl, j->url, j);
684         if (r < 0)
685                 return r;
686
687         if (!strv_isempty(j->old_etags)) {
688                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
689
690                 cc = strv_join(j->old_etags, ", ");
691                 if (!cc)
692                         return -ENOMEM;
693
694                 hdr = strappend("If-None-Match: ", cc);
695                 if (!hdr)
696                         return -ENOMEM;
697
698                 if (!j->request_header) {
699                         j->request_header = curl_slist_new(hdr, NULL);
700                         if (!j->request_header)
701                                 return -ENOMEM;
702                 } else {
703                         struct curl_slist *l;
704
705                         l = curl_slist_append(j->request_header, hdr);
706                         if (!l)
707                                 return -ENOMEM;
708
709                         j->request_header = l;
710                 }
711         }
712
713         if (j->request_header) {
714                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
715                         return -EIO;
716         }
717
718         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
719                 return -EIO;
720
721         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
722                 return -EIO;
723
724         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
725                 return -EIO;
726
727         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
728                 return -EIO;
729
730         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
731                 return -EIO;
732
733         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
734                 return -EIO;
735
736         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
737                 return -EIO;
738
739         r = curl_glue_add(j->glue, j->curl);
740         if (r < 0)
741                 return r;
742
743         j->state = IMPORT_JOB_ANALYZING;
744
745         return 0;
746 }