chiark / gitweb /
import: simplify dkr importer, by making use of generic import-job logic, used by...
[elogind.git] / src / import / import-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "import-job.h"
26
27 ImportJob* import_job_unref(ImportJob *j) {
28         if (!j)
29                 return NULL;
30
31         curl_glue_remove_and_free(j->glue, j->curl);
32         curl_slist_free_all(j->request_header);
33
34         safe_close(j->disk_fd);
35
36         if (j->compressed == IMPORT_JOB_XZ)
37                 lzma_end(&j->xz);
38         else if (j->compressed == IMPORT_JOB_GZIP)
39                 inflateEnd(&j->gzip);
40         else if (j->compressed == IMPORT_JOB_BZIP2)
41                 BZ2_bzDecompressEnd(&j->bzip2);
42
43         if (j->checksum_context)
44                 gcry_md_close(j->checksum_context);
45
46         free(j->url);
47         free(j->etag);
48         strv_free(j->old_etags);
49         free(j->payload);
50         free(j->checksum);
51
52         free(j);
53
54         return NULL;
55 }
56
57 static void import_job_finish(ImportJob *j, int ret) {
58         assert(j);
59
60         if (j->state == IMPORT_JOB_DONE ||
61             j->state == IMPORT_JOB_FAILED)
62                 return;
63
64         if (ret == 0) {
65                 j->state = IMPORT_JOB_DONE;
66                 log_info("Download of %s complete.", j->url);
67         } else {
68                 j->state = IMPORT_JOB_FAILED;
69                 j->error = ret;
70         }
71
72         if (j->on_finished)
73                 j->on_finished(j);
74 }
75
76 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
77         ImportJob *j = NULL;
78         CURLcode code;
79         long status;
80         int r;
81
82         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
83                 return;
84
85         if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
86                 return;
87
88         if (result != CURLE_OK) {
89                 log_error("Transfer failed: %s", curl_easy_strerror(result));
90                 r = -EIO;
91                 goto finish;
92         }
93
94         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
95         if (code != CURLE_OK) {
96                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
97                 r = -EIO;
98                 goto finish;
99         } else if (status == 304) {
100                 log_info("Image already downloaded. Skipping download.");
101                 j->etag_exists = true;
102                 r = 0;
103                 goto finish;
104         } else if (status >= 300) {
105                 log_error("HTTP request to %s failed with code %li.", j->url, status);
106                 r = -EIO;
107                 goto finish;
108         } else if (status < 200) {
109                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
110                 r = -EIO;
111                 goto finish;
112         }
113
114         if (j->state != IMPORT_JOB_RUNNING) {
115                 log_error("Premature connection termination.");
116                 r = -EIO;
117                 goto finish;
118         }
119
120         if (j->content_length != (uint64_t) -1 &&
121             j->content_length != j->written_compressed) {
122                 log_error("Download truncated.");
123                 r = -EIO;
124                 goto finish;
125         }
126
127         if (j->checksum_context) {
128                 uint8_t *k;
129
130                 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
131                 if (!k) {
132                         log_error("Failed to get checksum.");
133                         r = -EIO;
134                         goto finish;
135                 }
136
137                 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
138                 if (!j->checksum) {
139                         r = log_oom();
140                         goto finish;
141                 }
142
143                 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
144         }
145
146         if (j->disk_fd >= 0 && j->allow_sparse) {
147                 /* Make sure the file size is right, in case the file was
148                  * sparse and we just seeked for the last part */
149
150                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
151                         log_error_errno(errno, "Failed to truncate file: %m");
152                         r = -errno;
153                         goto finish;
154                 }
155
156                 if (j->etag)
157                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
158                 if (j->url)
159                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
160
161                 if (j->mtime != 0) {
162                         struct timespec ut[2];
163
164                         timespec_store(&ut[0], j->mtime);
165                         ut[1] = ut[0];
166                         (void) futimens(j->disk_fd, ut);
167
168                         (void) fd_setcrtime(j->disk_fd, j->mtime);
169                 }
170         }
171
172         r = 0;
173
174 finish:
175         import_job_finish(j, r);
176 }
177
178 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
179         ssize_t n;
180
181         assert(j);
182         assert(p);
183
184         if (sz <= 0)
185                 return 0;
186
187         if (j->written_uncompressed + sz < j->written_uncompressed) {
188                 log_error("File too large, overflow");
189                 return -EOVERFLOW;
190         }
191
192         if (j->written_uncompressed + sz > j->uncompressed_max) {
193                 log_error("File overly large, refusing");
194                 return -EFBIG;
195         }
196
197         if (j->disk_fd >= 0) {
198
199                 if (j->allow_sparse)
200                         n = sparse_write(j->disk_fd, p, sz, 64);
201                 else
202                         n = write(j->disk_fd, p, sz);
203                 if (n < 0) {
204                         log_error_errno(errno, "Failed to write file: %m");
205                         return -errno;
206                 }
207                 if ((size_t) n < sz) {
208                         log_error("Short write");
209                         return -EIO;
210                 }
211         } else {
212
213                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
214                         return log_oom();
215
216                 memcpy(j->payload + j->payload_size, p, sz);
217                 j->payload_size += sz;
218         }
219
220         j->written_uncompressed += sz;
221
222         return 0;
223 }
224
225 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
226         int r;
227
228         assert(j);
229         assert(p);
230
231         if (sz <= 0)
232                 return 0;
233
234         if (j->written_compressed + sz < j->written_compressed) {
235                 log_error("File too large, overflow");
236                 return -EOVERFLOW;
237         }
238
239         if (j->written_compressed + sz > j->compressed_max) {
240                 log_error("File overly large, refusing.");
241                 return -EFBIG;
242         }
243
244         if (j->content_length != (uint64_t) -1 &&
245             j->written_compressed + sz > j->content_length) {
246                 log_error("Content length incorrect.");
247                 return -EFBIG;
248         }
249
250         if (j->checksum_context)
251                 gcry_md_write(j->checksum_context, p, sz);
252
253         switch (j->compressed) {
254
255         case IMPORT_JOB_UNCOMPRESSED:
256                 r = import_job_write_uncompressed(j, p, sz);
257                 if (r < 0)
258                         return r;
259
260                 break;
261
262         case IMPORT_JOB_XZ:
263                 j->xz.next_in = p;
264                 j->xz.avail_in = sz;
265
266                 while (j->xz.avail_in > 0) {
267                         uint8_t buffer[16 * 1024];
268                         lzma_ret lzr;
269
270                         j->xz.next_out = buffer;
271                         j->xz.avail_out = sizeof(buffer);
272
273                         lzr = lzma_code(&j->xz, LZMA_RUN);
274                         if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
275                                 log_error("Decompression error.");
276                                 return -EIO;
277                         }
278
279                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
280                         if (r < 0)
281                                 return r;
282                 }
283
284                 break;
285
286         case IMPORT_JOB_GZIP:
287                 j->gzip.next_in = p;
288                 j->gzip.avail_in = sz;
289
290                 while (j->gzip.avail_in > 0) {
291                         uint8_t buffer[16 * 1024];
292
293                         j->gzip.next_out = buffer;
294                         j->gzip.avail_out = sizeof(buffer);
295
296                         r = inflate(&j->gzip, Z_NO_FLUSH);
297                         if (r != Z_OK && r != Z_STREAM_END) {
298                                 log_error("Decompression error.");
299                                 return -EIO;
300                         }
301
302                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
303                         if (r < 0)
304                                 return r;
305                 }
306
307                 break;
308
309         case IMPORT_JOB_BZIP2:
310                 j->bzip2.next_in = p;
311                 j->bzip2.avail_in = sz;
312
313                 while (j->bzip2.avail_in > 0) {
314                         uint8_t buffer[16 * 1024];
315
316                         j->bzip2.next_out = (char*) buffer;
317                         j->bzip2.avail_out = sizeof(buffer);
318
319                         r = BZ2_bzDecompress(&j->bzip2);
320                         if (r != BZ_OK && r != BZ_STREAM_END) {
321                                 log_error("Decompression error.");
322                                 return -EIO;
323                         }
324
325                         r = import_job_write_uncompressed(j,  buffer, sizeof(buffer) - j->bzip2.avail_out);
326                         if (r < 0)
327                                 return r;
328                 }
329
330                 break;
331
332         default:
333                 assert_not_reached("Unknown compression");
334         }
335
336         j->written_compressed += sz;
337
338         return 0;
339 }
340
341 static int import_job_open_disk(ImportJob *j) {
342         int r;
343
344         assert(j);
345
346         if (j->on_open_disk) {
347                 r = j->on_open_disk(j);
348                 if (r < 0)
349                         return r;
350         }
351
352         if (j->disk_fd >= 0) {
353                 /* Check if we can do sparse files */
354
355                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
356                         j->allow_sparse = true;
357                 else {
358                         if (errno != ESPIPE)
359                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
360
361                         j->allow_sparse = false;
362                 }
363         }
364
365         if (j->calc_checksum) {
366                 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
367                         log_error("Failed to initialize hash context.");
368                         return -EIO;
369                 }
370         }
371
372         return 0;
373 }
374
375 static int import_job_detect_compression(ImportJob *j) {
376         static const uint8_t xz_signature[] = {
377                 0xfd, '7', 'z', 'X', 'Z', 0x00
378         };
379         static const uint8_t gzip_signature[] = {
380                 0x1f, 0x8b
381         };
382         static const uint8_t bzip2_signature[] = {
383                 'B', 'Z', 'h'
384         };
385
386         _cleanup_free_ uint8_t *stub = NULL;
387         size_t stub_size;
388
389         int r;
390
391         assert(j);
392
393         if (j->payload_size < MAX3(sizeof(xz_signature),
394                                    sizeof(gzip_signature),
395                                    sizeof(bzip2_signature)))
396                 return 0;
397
398         if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
399                 j->compressed = IMPORT_JOB_XZ;
400         else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
401                 j->compressed = IMPORT_JOB_GZIP;
402         else if (memcmp(j->payload, bzip2_signature, sizeof(bzip2_signature)) == 0)
403                 j->compressed = IMPORT_JOB_BZIP2;
404         else
405                 j->compressed = IMPORT_JOB_UNCOMPRESSED;
406
407         log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
408         log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
409         log_debug("Stream is BZIP2 compressed: %s", yes_no(j->compressed == IMPORT_JOB_BZIP2));
410
411         if (j->compressed == IMPORT_JOB_XZ) {
412                 lzma_ret xzr;
413
414                 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
415                 if (xzr != LZMA_OK) {
416                         log_error("Failed to initialize XZ decoder.");
417                         return -EIO;
418                 }
419         }
420         if (j->compressed == IMPORT_JOB_GZIP) {
421                 r = inflateInit2(&j->gzip, 15+16);
422                 if (r != Z_OK) {
423                         log_error("Failed to initialize gzip decoder.");
424                         return -EIO;
425                 }
426         }
427         if (j->compressed == IMPORT_JOB_BZIP2) {
428                 r = BZ2_bzDecompressInit(&j->bzip2, 0, 0);
429                 if (r != BZ_OK) {
430                         log_error("Failed to initialize bzip2 decoder.");
431                         return -EIO;
432                 }
433         }
434
435         r = import_job_open_disk(j);
436         if (r < 0)
437                 return r;
438
439         /* Now, take the payload we read so far, and decompress it */
440         stub = j->payload;
441         stub_size = j->payload_size;
442
443         j->payload = NULL;
444         j->payload_size = 0;
445         j->payload_allocated = 0;
446
447         j->state = IMPORT_JOB_RUNNING;
448
449         r = import_job_write_compressed(j, stub, stub_size);
450         if (r < 0)
451                 return r;
452
453         return 0;
454 }
455
456 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
457         ImportJob *j = userdata;
458         size_t sz = size * nmemb;
459         int r;
460
461         assert(contents);
462         assert(j);
463
464         switch (j->state) {
465
466         case IMPORT_JOB_ANALYZING:
467                 /* Let's first check what it actually is */
468
469                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
470                         r = log_oom();
471                         goto fail;
472                 }
473
474                 memcpy(j->payload + j->payload_size, contents, sz);
475                 j->payload_size += sz;
476
477                 r = import_job_detect_compression(j);
478                 if (r < 0)
479                         goto fail;
480
481                 break;
482
483         case IMPORT_JOB_RUNNING:
484
485                 r = import_job_write_compressed(j, contents, sz);
486                 if (r < 0)
487                         goto fail;
488
489                 break;
490
491         case IMPORT_JOB_DONE:
492         case IMPORT_JOB_FAILED:
493                 r = -ESTALE;
494                 goto fail;
495
496         default:
497                 assert_not_reached("Impossible state.");
498         }
499
500         return sz;
501
502 fail:
503         import_job_finish(j, r);
504         return 0;
505 }
506
507 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
508         ImportJob *j = userdata;
509         size_t sz = size * nmemb;
510         _cleanup_free_ char *length = NULL, *last_modified = NULL;
511         char *etag;
512         int r;
513
514         assert(contents);
515         assert(j);
516
517         if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
518                 r = -ESTALE;
519                 goto fail;
520         }
521
522         assert(j->state == IMPORT_JOB_ANALYZING);
523
524         r = curl_header_strdup(contents, sz, "ETag:", &etag);
525         if (r < 0) {
526                 log_oom();
527                 goto fail;
528         }
529         if (r > 0) {
530                 free(j->etag);
531                 j->etag = etag;
532
533                 if (strv_contains(j->old_etags, j->etag)) {
534                         log_info("Image already downloaded. Skipping download.");
535                         j->etag_exists = true;
536                         import_job_finish(j, 0);
537                         return sz;
538                 }
539
540                 return sz;
541         }
542
543         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
544         if (r < 0) {
545                 log_oom();
546                 goto fail;
547         }
548         if (r > 0) {
549                 (void) safe_atou64(length, &j->content_length);
550
551                 if (j->content_length != (uint64_t) -1) {
552                         char bytes[FORMAT_BYTES_MAX];
553
554                         if (j->content_length > j->compressed_max) {
555                                 log_error("Content too large.");
556                                 r = -EFBIG;
557                                 goto fail;
558                         }
559
560                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
561                 }
562
563                 return sz;
564         }
565
566         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
567         if (r < 0) {
568                 log_oom();
569                 goto fail;
570         }
571         if (r > 0) {
572                 (void) curl_parse_http_time(last_modified, &j->mtime);
573                 return sz;
574         }
575
576         if (j->on_header) {
577                 r = j->on_header(j, contents, sz);
578                 if (r < 0)
579                         goto fail;
580         }
581
582         return sz;
583
584 fail:
585         import_job_finish(j, r);
586         return 0;
587 }
588
589 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
590         ImportJob *j = userdata;
591         unsigned percent;
592         usec_t n;
593
594         assert(j);
595
596         if (dltotal <= 0)
597                 return 0;
598
599         percent = ((100 * dlnow) / dltotal);
600         n = now(CLOCK_MONOTONIC);
601
602         if (n > j->last_status_usec + USEC_PER_SEC &&
603             percent != j->progress_percent &&
604             dlnow < dltotal) {
605                 char buf[FORMAT_TIMESPAN_MAX];
606
607                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
608                         char y[FORMAT_BYTES_MAX];
609                         usec_t left, done;
610
611                         done = n - j->start_usec;
612                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
613
614                         log_info("Got %u%% of %s. %s left at %s/s.",
615                                  percent,
616                                  j->url,
617                                  format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
618                                  format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
619                 } else
620                         log_info("Got %u%% of %s.", percent, j->url);
621
622                 j->progress_percent = percent;
623                 j->last_status_usec = n;
624         }
625
626         return 0;
627 }
628
629 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
630         _cleanup_(import_job_unrefp) ImportJob *j = NULL;
631
632         assert(url);
633         assert(glue);
634         assert(ret);
635
636         j = new0(ImportJob, 1);
637         if (!j)
638                 return -ENOMEM;
639
640         j->state = IMPORT_JOB_INIT;
641         j->disk_fd = -1;
642         j->userdata = userdata;
643         j->glue = glue;
644         j->content_length = (uint64_t) -1;
645         j->start_usec = now(CLOCK_MONOTONIC);
646         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
647
648         j->url = strdup(url);
649         if (!j->url)
650                 return -ENOMEM;
651
652         *ret = j;
653         j = NULL;
654
655         return 0;
656 }
657
658 int import_job_begin(ImportJob *j) {
659         int r;
660
661         assert(j);
662
663         if (j->state != IMPORT_JOB_INIT)
664                 return -EBUSY;
665
666         r = curl_glue_make(&j->curl, j->url, j);
667         if (r < 0)
668                 return r;
669
670         if (!strv_isempty(j->old_etags)) {
671                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
672
673                 cc = strv_join(j->old_etags, ", ");
674                 if (!cc)
675                         return -ENOMEM;
676
677                 hdr = strappend("If-None-Match: ", cc);
678                 if (!hdr)
679                         return -ENOMEM;
680
681                 if (!j->request_header) {
682                         j->request_header = curl_slist_new(hdr, NULL);
683                         if (!j->request_header)
684                                 return -ENOMEM;
685                 } else {
686                         struct curl_slist *l;
687
688                         l = curl_slist_append(j->request_header, hdr);
689                         if (!l)
690                                 return -ENOMEM;
691
692                         j->request_header = l;
693                 }
694         }
695
696         if (j->request_header) {
697                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
698                         return -EIO;
699         }
700
701         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
702                 return -EIO;
703
704         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
705                 return -EIO;
706
707         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
708                 return -EIO;
709
710         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
711                 return -EIO;
712
713         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
714                 return -EIO;
715
716         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
717                 return -EIO;
718
719         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
720                 return -EIO;
721
722         r = curl_glue_add(j->glue, j->curl);
723         if (r < 0)
724                 return r;
725
726         j->state = IMPORT_JOB_ANALYZING;
727
728         return 0;
729 }