chiark / gitweb /
6de32686c5003533f5a8ec3eed1fd97c20ea1038
[elogind.git] / src / import / import-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "import-job.h"
26
27 ImportJob* import_job_unref(ImportJob *j) {
28         if (!j)
29                 return NULL;
30
31         curl_glue_remove_and_free(j->glue, j->curl);
32         curl_slist_free_all(j->request_header);
33
34         safe_close(j->disk_fd);
35
36         if (j->compressed == IMPORT_JOB_XZ)
37                 lzma_end(&j->xz);
38         else if (j->compressed == IMPORT_JOB_GZIP)
39                 inflateEnd(&j->gzip);
40
41         if (j->hash_context)
42                 gcry_md_close(j->hash_context);
43
44         free(j->url);
45         free(j->etag);
46         strv_free(j->old_etags);
47         free(j->payload);
48         free(j->sha256);
49
50         free(j);
51
52         return NULL;
53 }
54
55 DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
56
57 static void import_job_finish(ImportJob *j, int ret) {
58         assert(j);
59
60         if (j->state == IMPORT_JOB_DONE ||
61             j->state == IMPORT_JOB_FAILED)
62                 return;
63
64         if (ret == 0) {
65                 j->state = IMPORT_JOB_DONE;
66                 log_info("Download of %s complete.", j->url);
67         } else {
68                 j->state = IMPORT_JOB_FAILED;
69                 j->error = ret;
70         }
71
72         if (j->on_finished)
73                 j->on_finished(j);
74 }
75
76 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
77         ImportJob *j = NULL;
78         CURLcode code;
79         long status;
80         int r;
81
82         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
83                 return;
84
85         if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
86                 return;
87
88         if (result != CURLE_OK) {
89                 log_error("Transfer failed: %s", curl_easy_strerror(result));
90                 r = -EIO;
91                 goto finish;
92         }
93
94         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
95         if (code != CURLE_OK) {
96                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
97                 r = -EIO;
98                 goto finish;
99         } else if (status == 304) {
100                 log_info("Image already downloaded. Skipping download.");
101                 j->etag_exists = true;
102                 r = 0;
103                 goto finish;
104         } else if (status >= 300) {
105                 log_error("HTTP request to %s failed with code %li.", j->url, status);
106                 r = -EIO;
107                 goto finish;
108         } else if (status < 200) {
109                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
110                 r = -EIO;
111                 goto finish;
112         }
113
114         if (j->state != IMPORT_JOB_RUNNING) {
115                 log_error("Premature connection termination.");
116                 r = -EIO;
117                 goto finish;
118         }
119
120         if (j->content_length != (uint64_t) -1 &&
121             j->content_length != j->written_compressed) {
122                 log_error("Download truncated.");
123                 r = -EIO;
124                 goto finish;
125         }
126
127         if (j->hash_context) {
128                 uint8_t *k;
129
130                 k = gcry_md_read(j->hash_context, GCRY_MD_SHA256);
131                 if (!k) {
132                         log_error("Failed to get checksum.");
133                         r = -EIO;
134                         goto finish;
135                 }
136
137                 j->sha256 = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
138                 if (!j->sha256) {
139                         r = log_oom();
140                         goto finish;
141                 }
142
143                 log_debug("SHA256 of %s is %s.", j->url, j->sha256);
144         }
145
146         if (j->disk_fd >= 0 && j->allow_sparse) {
147                 /* Make sure the file size is right, in case the file was
148                  * sparse and we just seeked for the last part */
149
150                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
151                         log_error_errno(errno, "Failed to truncate file: %m");
152                         r = -errno;
153                         goto finish;
154                 }
155
156                 if (j->etag)
157                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
158                 if (j->url)
159                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
160
161                 if (j->mtime != 0) {
162                         struct timespec ut[2];
163
164                         timespec_store(&ut[0], j->mtime);
165                         ut[1] = ut[0];
166                         (void) futimens(j->disk_fd, ut);
167
168                         (void) fd_setcrtime(j->disk_fd, j->mtime);
169                 }
170         }
171
172         r = 0;
173
174 finish:
175         import_job_finish(j, r);
176 }
177
178 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
179         ssize_t n;
180
181         assert(j);
182         assert(p);
183         assert(sz > 0);
184
185         if (j->written_uncompressed + sz < j->written_uncompressed) {
186                 log_error("File too large, overflow");
187                 return -EOVERFLOW;
188         }
189
190         if (j->written_uncompressed + sz > j->uncompressed_max) {
191                 log_error("File overly large, refusing");
192                 return -EFBIG;
193         }
194
195         if (j->disk_fd >= 0) {
196
197                 if (j->allow_sparse)
198                         n = sparse_write(j->disk_fd, p, sz, 64);
199                 else
200                         n = write(j->disk_fd, p, sz);
201                 if (n < 0) {
202                         log_error_errno(errno, "Failed to write file: %m");
203                         return -errno;
204                 }
205                 if ((size_t) n < sz) {
206                         log_error("Short write");
207                         return -EIO;
208                 }
209         } else {
210
211                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
212                         return log_oom();
213
214                 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
215                 j->payload_size += sz;
216         }
217
218         j->written_uncompressed += sz;
219
220         return 0;
221 }
222
223 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
224         int r;
225
226         assert(j);
227         assert(p);
228         assert(sz > 0);
229
230         if (j->written_compressed + sz < j->written_compressed) {
231                 log_error("File too large, overflow");
232                 return -EOVERFLOW;
233         }
234
235         if (j->written_compressed + sz > j->compressed_max) {
236                 log_error("File overly large, refusing.");
237                 return -EFBIG;
238         }
239
240         if (j->content_length != (uint64_t) -1 &&
241             j->written_compressed + sz > j->content_length) {
242                 log_error("Content length incorrect.");
243                 return -EFBIG;
244         }
245
246         if (j->hash_context)
247                 gcry_md_write(j->hash_context, p, sz);
248
249         switch (j->compressed) {
250
251         case IMPORT_JOB_UNCOMPRESSED:
252                 r = import_job_write_uncompressed(j, p, sz);
253                 if (r < 0)
254                         return r;
255
256                 break;
257
258         case IMPORT_JOB_XZ:
259                 j->xz.next_in = p;
260                 j->xz.avail_in = sz;
261
262                 while (j->xz.avail_in > 0) {
263                         uint8_t buffer[16 * 1024];
264                         lzma_ret lzr;
265
266                         j->xz.next_out = buffer;
267                         j->xz.avail_out = sizeof(buffer);
268
269                         lzr = lzma_code(&j->xz, LZMA_RUN);
270                         if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
271                                 log_error("Decompression error.");
272                                 return -EIO;
273                         }
274
275                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
276                         if (r < 0)
277                                 return r;
278                 }
279
280                 break;
281
282         case IMPORT_JOB_GZIP:
283                 j->gzip.next_in = p;
284                 j->gzip.avail_in = sz;
285
286                 while (j->gzip.avail_in > 0) {
287                         uint8_t buffer[16 * 1024];
288
289                         j->gzip.next_out = buffer;
290                         j->gzip.avail_out = sizeof(buffer);
291
292                         r = inflate(&j->gzip, Z_NO_FLUSH);
293                         if (r != Z_OK && r != Z_STREAM_END) {
294                                 log_error("Decompression error.");
295                                 return -EIO;
296                         }
297
298                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
299                         if (r < 0)
300                                 return r;
301                 }
302
303                 break;
304
305         default:
306                 assert_not_reached("Unknown compression");
307         }
308
309         j->written_compressed += sz;
310
311         return 0;
312 }
313
314 static int import_job_open_disk(ImportJob *j) {
315         int r;
316
317         assert(j);
318
319         if (j->on_open_disk) {
320                 r = j->on_open_disk(j);
321                 if (r < 0)
322                         return r;
323         }
324
325         if (j->disk_fd >= 0) {
326                 /* Check if we can do sparse files */
327
328                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
329                         j->allow_sparse = true;
330                 else {
331                         if (errno != ESPIPE)
332                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
333
334                         j->allow_sparse = false;
335                 }
336         }
337
338         if (j->calc_hash) {
339                 if (gcry_md_open(&j->hash_context, GCRY_MD_SHA256, 0) != 0) {
340                         log_error("Failed to initialize hash context.");
341                         return -EIO;
342                 }
343         }
344
345         return 0;
346 }
347
348 static int import_job_detect_compression(ImportJob *j) {
349         static const uint8_t xz_signature[] = {
350                 0xfd, '7', 'z', 'X', 'Z', 0x00
351         };
352         static const uint8_t gzip_signature[] = {
353                 0x1f, 0x8b
354         };
355
356         _cleanup_free_ uint8_t *stub = NULL;
357         size_t stub_size;
358
359         int r;
360
361         assert(j);
362
363         if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
364                 return 0;
365
366         if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
367                 j->compressed = IMPORT_JOB_XZ;
368         else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
369                 j->compressed = IMPORT_JOB_GZIP;
370         else
371                 j->compressed = IMPORT_JOB_UNCOMPRESSED;
372
373         log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
374         log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
375
376         if (j->compressed == IMPORT_JOB_XZ) {
377                 lzma_ret xzr;
378
379                 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
380                 if (xzr != LZMA_OK) {
381                         log_error("Failed to initialize XZ decoder.");
382                         return -EIO;
383                 }
384         }
385         if (j->compressed == IMPORT_JOB_GZIP) {
386                 r = inflateInit2(&j->gzip, 15+16);
387                 if (r != Z_OK) {
388                         log_error("Failed to initialize gzip decoder.");
389                         return -EIO;
390                 }
391         }
392
393         r = import_job_open_disk(j);
394         if (r < 0)
395                 return r;
396
397         /* Now, take the payload we read so far, and decompress it */
398         stub = j->payload;
399         stub_size = j->payload_size;
400
401         j->payload = NULL;
402         j->payload_size = 0;
403         j->payload_allocated = 0;
404
405         j->state = IMPORT_JOB_RUNNING;
406
407         r = import_job_write_compressed(j, stub, stub_size);
408         if (r < 0)
409                 return r;
410
411         return 0;
412 }
413
414 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
415         ImportJob *j = userdata;
416         size_t sz = size * nmemb;
417         int r;
418
419         assert(contents);
420         assert(j);
421
422         switch (j->state) {
423
424         case IMPORT_JOB_ANALYZING:
425                 /* Let's first check what it actually is */
426
427                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
428                         r = log_oom();
429                         goto fail;
430                 }
431
432                 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
433                 j->payload_size += sz;
434
435                 r = import_job_detect_compression(j);
436                 if (r < 0)
437                         goto fail;
438
439                 break;
440
441         case IMPORT_JOB_RUNNING:
442
443                 r = import_job_write_compressed(j, contents, sz);
444                 if (r < 0)
445                         goto fail;
446
447                 break;
448
449         case IMPORT_JOB_DONE:
450         case IMPORT_JOB_FAILED:
451                 r = -ESTALE;
452                 goto fail;
453
454         default:
455                 assert_not_reached("Impossible state.");
456         }
457
458         return sz;
459
460 fail:
461         import_job_finish(j, r);
462         return 0;
463 }
464
465 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
466         ImportJob *j = userdata;
467         size_t sz = size * nmemb;
468         _cleanup_free_ char *length = NULL, *last_modified = NULL;
469         char *etag;
470         int r;
471
472         assert(contents);
473         assert(j);
474
475         if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
476                 r = -ESTALE;
477                 goto fail;
478         }
479
480         assert(j->state == IMPORT_JOB_ANALYZING);
481
482         r = curl_header_strdup(contents, sz, "ETag:", &etag);
483         if (r < 0) {
484                 log_oom();
485                 goto fail;
486         }
487         if (r > 0) {
488                 free(j->etag);
489                 j->etag = etag;
490
491                 if (strv_contains(j->old_etags, j->etag)) {
492                         log_info("Image already downloaded. Skipping download.");
493                         j->etag_exists = true;
494                         import_job_finish(j, 0);
495                         return sz;
496                 }
497
498                 return sz;
499         }
500
501         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
502         if (r < 0) {
503                 log_oom();
504                 goto fail;
505         }
506         if (r > 0) {
507                 (void) safe_atou64(length, &j->content_length);
508
509                 if (j->content_length != (uint64_t) -1) {
510                         char bytes[FORMAT_BYTES_MAX];
511
512                         if (j->content_length > j->compressed_max) {
513                                 log_error("Content too large.");
514                                 r = -EFBIG;
515                                 goto fail;
516                         }
517
518                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
519                 }
520
521                 return sz;
522         }
523
524         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
525         if (r < 0) {
526                 log_oom();
527                 goto fail;
528         }
529         if (r > 0) {
530                 (void) curl_parse_http_time(last_modified, &j->mtime);
531                 return sz;
532         }
533
534         return sz;
535
536 fail:
537         import_job_finish(j, r);
538         return 0;
539 }
540
541 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
542         ImportJob *j = userdata;
543         unsigned percent;
544         usec_t n;
545
546         assert(j);
547
548         if (dltotal <= 0)
549                 return 0;
550
551         percent = ((100 * dlnow) / dltotal);
552         n = now(CLOCK_MONOTONIC);
553
554         if (n > j->last_status_usec + USEC_PER_SEC &&
555             percent != j->progress_percent &&
556             dlnow < dltotal) {
557                 char buf[FORMAT_TIMESPAN_MAX];
558
559                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
560                         usec_t left, done;
561
562                         done = n - j->start_usec;
563                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
564
565                         log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC));
566                 } else
567                         log_info("Got %u%% of %s.", percent, j->url);
568
569                 j->progress_percent = percent;
570                 j->last_status_usec = n;
571         }
572
573         return 0;
574 }
575
576 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
577         _cleanup_(import_job_unrefp) ImportJob *j = NULL;
578
579         assert(url);
580         assert(glue);
581         assert(ret);
582
583         j = new0(ImportJob, 1);
584         if (!j)
585                 return -ENOMEM;
586
587         j->state = IMPORT_JOB_INIT;
588         j->disk_fd = -1;
589         j->userdata = userdata;
590         j->glue = glue;
591         j->content_length = (uint64_t) -1;
592         j->start_usec = now(CLOCK_MONOTONIC);
593         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
594
595         j->url = strdup(url);
596         if (!j->url)
597                 return -ENOMEM;
598
599         *ret = j;
600         j = NULL;
601
602         return 0;
603 }
604
605 int import_job_begin(ImportJob *j) {
606         int r;
607
608         assert(j);
609
610         if (j->state != IMPORT_JOB_INIT)
611                 return -EBUSY;
612
613         r = curl_glue_make(&j->curl, j->url, j);
614         if (r < 0)
615                 return r;
616
617         if (!strv_isempty(j->old_etags)) {
618                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
619
620                 cc = strv_join(j->old_etags, ", ");
621                 if (!cc)
622                         return -ENOMEM;
623
624                 hdr = strappend("If-None-Match: ", cc);
625                 if (!hdr)
626                         return -ENOMEM;
627
628                 j->request_header = curl_slist_new(hdr, NULL);
629                 if (!j->request_header)
630                         return -ENOMEM;
631
632                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
633                         return -EIO;
634         }
635
636         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
637                 return -EIO;
638
639         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
640                 return -EIO;
641
642         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
643                 return -EIO;
644
645         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
646                 return -EIO;
647
648         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
649                 return -EIO;
650
651         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
652                 return -EIO;
653
654         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
655                 return -EIO;
656
657         r = curl_glue_add(j->glue, j->curl);
658         if (r < 0)
659                 return r;
660
661         j->state = IMPORT_JOB_ANALYZING;
662
663         return 0;
664 }