chiark / gitweb /
322aa1a18c5c464e453b877a3af430e8336e0179
[elogind.git] / src / import / import-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "import-job.h"
26
27 ImportJob* import_job_unref(ImportJob *j) {
28         if (!j)
29                 return NULL;
30
31         curl_glue_remove_and_free(j->glue, j->curl);
32         curl_slist_free_all(j->request_header);
33
34         safe_close(j->disk_fd);
35
36         if (j->compressed == IMPORT_JOB_XZ)
37                 lzma_end(&j->xz);
38         else if (j->compressed == IMPORT_JOB_GZIP)
39                 inflateEnd(&j->gzip);
40
41         if (j->checksum_context)
42                 gcry_md_close(j->checksum_context);
43
44         free(j->url);
45         free(j->etag);
46         strv_free(j->old_etags);
47         free(j->payload);
48         free(j->checksum);
49
50         free(j);
51
52         return NULL;
53 }
54
55 static void import_job_finish(ImportJob *j, int ret) {
56         assert(j);
57
58         if (j->state == IMPORT_JOB_DONE ||
59             j->state == IMPORT_JOB_FAILED)
60                 return;
61
62         if (ret == 0) {
63                 j->state = IMPORT_JOB_DONE;
64                 log_info("Download of %s complete.", j->url);
65         } else {
66                 j->state = IMPORT_JOB_FAILED;
67                 j->error = ret;
68         }
69
70         if (j->on_finished)
71                 j->on_finished(j);
72 }
73
74 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
75         ImportJob *j = NULL;
76         CURLcode code;
77         long status;
78         int r;
79
80         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
81                 return;
82
83         if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
84                 return;
85
86         if (result != CURLE_OK) {
87                 log_error("Transfer failed: %s", curl_easy_strerror(result));
88                 r = -EIO;
89                 goto finish;
90         }
91
92         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
93         if (code != CURLE_OK) {
94                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
95                 r = -EIO;
96                 goto finish;
97         } else if (status == 304) {
98                 log_info("Image already downloaded. Skipping download.");
99                 j->etag_exists = true;
100                 r = 0;
101                 goto finish;
102         } else if (status >= 300) {
103                 log_error("HTTP request to %s failed with code %li.", j->url, status);
104                 r = -EIO;
105                 goto finish;
106         } else if (status < 200) {
107                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
108                 r = -EIO;
109                 goto finish;
110         }
111
112         if (j->state != IMPORT_JOB_RUNNING) {
113                 log_error("Premature connection termination.");
114                 r = -EIO;
115                 goto finish;
116         }
117
118         if (j->content_length != (uint64_t) -1 &&
119             j->content_length != j->written_compressed) {
120                 log_error("Download truncated.");
121                 r = -EIO;
122                 goto finish;
123         }
124
125         if (j->checksum_context) {
126                 uint8_t *k;
127
128                 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
129                 if (!k) {
130                         log_error("Failed to get checksum.");
131                         r = -EIO;
132                         goto finish;
133                 }
134
135                 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
136                 if (!j->checksum) {
137                         r = log_oom();
138                         goto finish;
139                 }
140
141                 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
142         }
143
144         if (j->disk_fd >= 0 && j->allow_sparse) {
145                 /* Make sure the file size is right, in case the file was
146                  * sparse and we just seeked for the last part */
147
148                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
149                         log_error_errno(errno, "Failed to truncate file: %m");
150                         r = -errno;
151                         goto finish;
152                 }
153
154                 if (j->etag)
155                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
156                 if (j->url)
157                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
158
159                 if (j->mtime != 0) {
160                         struct timespec ut[2];
161
162                         timespec_store(&ut[0], j->mtime);
163                         ut[1] = ut[0];
164                         (void) futimens(j->disk_fd, ut);
165
166                         (void) fd_setcrtime(j->disk_fd, j->mtime);
167                 }
168         }
169
170         r = 0;
171
172 finish:
173         import_job_finish(j, r);
174 }
175
176 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
177         ssize_t n;
178
179         assert(j);
180         assert(p);
181         assert(sz > 0);
182
183         if (j->written_uncompressed + sz < j->written_uncompressed) {
184                 log_error("File too large, overflow");
185                 return -EOVERFLOW;
186         }
187
188         if (j->written_uncompressed + sz > j->uncompressed_max) {
189                 log_error("File overly large, refusing");
190                 return -EFBIG;
191         }
192
193         if (j->disk_fd >= 0) {
194
195                 if (j->allow_sparse)
196                         n = sparse_write(j->disk_fd, p, sz, 64);
197                 else
198                         n = write(j->disk_fd, p, sz);
199                 if (n < 0) {
200                         log_error_errno(errno, "Failed to write file: %m");
201                         return -errno;
202                 }
203                 if ((size_t) n < sz) {
204                         log_error("Short write");
205                         return -EIO;
206                 }
207         } else {
208
209                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
210                         return log_oom();
211
212                 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
213                 j->payload_size += sz;
214         }
215
216         j->written_uncompressed += sz;
217
218         return 0;
219 }
220
221 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
222         int r;
223
224         assert(j);
225         assert(p);
226         assert(sz > 0);
227
228         if (j->written_compressed + sz < j->written_compressed) {
229                 log_error("File too large, overflow");
230                 return -EOVERFLOW;
231         }
232
233         if (j->written_compressed + sz > j->compressed_max) {
234                 log_error("File overly large, refusing.");
235                 return -EFBIG;
236         }
237
238         if (j->content_length != (uint64_t) -1 &&
239             j->written_compressed + sz > j->content_length) {
240                 log_error("Content length incorrect.");
241                 return -EFBIG;
242         }
243
244         if (j->checksum_context)
245                 gcry_md_write(j->checksum_context, p, sz);
246
247         switch (j->compressed) {
248
249         case IMPORT_JOB_UNCOMPRESSED:
250                 r = import_job_write_uncompressed(j, p, sz);
251                 if (r < 0)
252                         return r;
253
254                 break;
255
256         case IMPORT_JOB_XZ:
257                 j->xz.next_in = p;
258                 j->xz.avail_in = sz;
259
260                 while (j->xz.avail_in > 0) {
261                         uint8_t buffer[16 * 1024];
262                         lzma_ret lzr;
263
264                         j->xz.next_out = buffer;
265                         j->xz.avail_out = sizeof(buffer);
266
267                         lzr = lzma_code(&j->xz, LZMA_RUN);
268                         if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
269                                 log_error("Decompression error.");
270                                 return -EIO;
271                         }
272
273                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
274                         if (r < 0)
275                                 return r;
276                 }
277
278                 break;
279
280         case IMPORT_JOB_GZIP:
281                 j->gzip.next_in = p;
282                 j->gzip.avail_in = sz;
283
284                 while (j->gzip.avail_in > 0) {
285                         uint8_t buffer[16 * 1024];
286
287                         j->gzip.next_out = buffer;
288                         j->gzip.avail_out = sizeof(buffer);
289
290                         r = inflate(&j->gzip, Z_NO_FLUSH);
291                         if (r != Z_OK && r != Z_STREAM_END) {
292                                 log_error("Decompression error.");
293                                 return -EIO;
294                         }
295
296                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
297                         if (r < 0)
298                                 return r;
299                 }
300
301                 break;
302
303         default:
304                 assert_not_reached("Unknown compression");
305         }
306
307         j->written_compressed += sz;
308
309         return 0;
310 }
311
312 static int import_job_open_disk(ImportJob *j) {
313         int r;
314
315         assert(j);
316
317         if (j->on_open_disk) {
318                 r = j->on_open_disk(j);
319                 if (r < 0)
320                         return r;
321         }
322
323         if (j->disk_fd >= 0) {
324                 /* Check if we can do sparse files */
325
326                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
327                         j->allow_sparse = true;
328                 else {
329                         if (errno != ESPIPE)
330                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
331
332                         j->allow_sparse = false;
333                 }
334         }
335
336         if (j->calc_checksum) {
337                 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
338                         log_error("Failed to initialize hash context.");
339                         return -EIO;
340                 }
341         }
342
343         return 0;
344 }
345
346 static int import_job_detect_compression(ImportJob *j) {
347         static const uint8_t xz_signature[] = {
348                 0xfd, '7', 'z', 'X', 'Z', 0x00
349         };
350         static const uint8_t gzip_signature[] = {
351                 0x1f, 0x8b
352         };
353
354         _cleanup_free_ uint8_t *stub = NULL;
355         size_t stub_size;
356
357         int r;
358
359         assert(j);
360
361         if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
362                 return 0;
363
364         if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
365                 j->compressed = IMPORT_JOB_XZ;
366         else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
367                 j->compressed = IMPORT_JOB_GZIP;
368         else
369                 j->compressed = IMPORT_JOB_UNCOMPRESSED;
370
371         log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
372         log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
373
374         if (j->compressed == IMPORT_JOB_XZ) {
375                 lzma_ret xzr;
376
377                 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
378                 if (xzr != LZMA_OK) {
379                         log_error("Failed to initialize XZ decoder.");
380                         return -EIO;
381                 }
382         }
383         if (j->compressed == IMPORT_JOB_GZIP) {
384                 r = inflateInit2(&j->gzip, 15+16);
385                 if (r != Z_OK) {
386                         log_error("Failed to initialize gzip decoder.");
387                         return -EIO;
388                 }
389         }
390
391         r = import_job_open_disk(j);
392         if (r < 0)
393                 return r;
394
395         /* Now, take the payload we read so far, and decompress it */
396         stub = j->payload;
397         stub_size = j->payload_size;
398
399         j->payload = NULL;
400         j->payload_size = 0;
401         j->payload_allocated = 0;
402
403         j->state = IMPORT_JOB_RUNNING;
404
405         r = import_job_write_compressed(j, stub, stub_size);
406         if (r < 0)
407                 return r;
408
409         return 0;
410 }
411
412 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
413         ImportJob *j = userdata;
414         size_t sz = size * nmemb;
415         int r;
416
417         assert(contents);
418         assert(j);
419
420         switch (j->state) {
421
422         case IMPORT_JOB_ANALYZING:
423                 /* Let's first check what it actually is */
424
425                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
426                         r = log_oom();
427                         goto fail;
428                 }
429
430                 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
431                 j->payload_size += sz;
432
433                 r = import_job_detect_compression(j);
434                 if (r < 0)
435                         goto fail;
436
437                 break;
438
439         case IMPORT_JOB_RUNNING:
440
441                 r = import_job_write_compressed(j, contents, sz);
442                 if (r < 0)
443                         goto fail;
444
445                 break;
446
447         case IMPORT_JOB_DONE:
448         case IMPORT_JOB_FAILED:
449                 r = -ESTALE;
450                 goto fail;
451
452         default:
453                 assert_not_reached("Impossible state.");
454         }
455
456         return sz;
457
458 fail:
459         import_job_finish(j, r);
460         return 0;
461 }
462
463 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
464         ImportJob *j = userdata;
465         size_t sz = size * nmemb;
466         _cleanup_free_ char *length = NULL, *last_modified = NULL;
467         char *etag;
468         int r;
469
470         assert(contents);
471         assert(j);
472
473         if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
474                 r = -ESTALE;
475                 goto fail;
476         }
477
478         assert(j->state == IMPORT_JOB_ANALYZING);
479
480         r = curl_header_strdup(contents, sz, "ETag:", &etag);
481         if (r < 0) {
482                 log_oom();
483                 goto fail;
484         }
485         if (r > 0) {
486                 free(j->etag);
487                 j->etag = etag;
488
489                 if (strv_contains(j->old_etags, j->etag)) {
490                         log_info("Image already downloaded. Skipping download.");
491                         j->etag_exists = true;
492                         import_job_finish(j, 0);
493                         return sz;
494                 }
495
496                 return sz;
497         }
498
499         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
500         if (r < 0) {
501                 log_oom();
502                 goto fail;
503         }
504         if (r > 0) {
505                 (void) safe_atou64(length, &j->content_length);
506
507                 if (j->content_length != (uint64_t) -1) {
508                         char bytes[FORMAT_BYTES_MAX];
509
510                         if (j->content_length > j->compressed_max) {
511                                 log_error("Content too large.");
512                                 r = -EFBIG;
513                                 goto fail;
514                         }
515
516                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
517                 }
518
519                 return sz;
520         }
521
522         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
523         if (r < 0) {
524                 log_oom();
525                 goto fail;
526         }
527         if (r > 0) {
528                 (void) curl_parse_http_time(last_modified, &j->mtime);
529                 return sz;
530         }
531
532         return sz;
533
534 fail:
535         import_job_finish(j, r);
536         return 0;
537 }
538
539 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
540         ImportJob *j = userdata;
541         unsigned percent;
542         usec_t n;
543
544         assert(j);
545
546         if (dltotal <= 0)
547                 return 0;
548
549         percent = ((100 * dlnow) / dltotal);
550         n = now(CLOCK_MONOTONIC);
551
552         if (n > j->last_status_usec + USEC_PER_SEC &&
553             percent != j->progress_percent &&
554             dlnow < dltotal) {
555                 char buf[FORMAT_TIMESPAN_MAX];
556
557                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
558                         char y[FORMAT_BYTES_MAX];
559                         usec_t left, done;
560
561                         done = n - j->start_usec;
562                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
563
564                         log_info("Got %u%% of %s. %s left at %s/s.",
565                                  percent,
566                                  j->url,
567                                  format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
568                                  format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
569                 } else
570                         log_info("Got %u%% of %s.", percent, j->url);
571
572                 j->progress_percent = percent;
573                 j->last_status_usec = n;
574         }
575
576         return 0;
577 }
578
579 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
580         _cleanup_(import_job_unrefp) ImportJob *j = NULL;
581
582         assert(url);
583         assert(glue);
584         assert(ret);
585
586         j = new0(ImportJob, 1);
587         if (!j)
588                 return -ENOMEM;
589
590         j->state = IMPORT_JOB_INIT;
591         j->disk_fd = -1;
592         j->userdata = userdata;
593         j->glue = glue;
594         j->content_length = (uint64_t) -1;
595         j->start_usec = now(CLOCK_MONOTONIC);
596         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
597
598         j->url = strdup(url);
599         if (!j->url)
600                 return -ENOMEM;
601
602         *ret = j;
603         j = NULL;
604
605         return 0;
606 }
607
608 int import_job_begin(ImportJob *j) {
609         int r;
610
611         assert(j);
612
613         if (j->state != IMPORT_JOB_INIT)
614                 return -EBUSY;
615
616         r = curl_glue_make(&j->curl, j->url, j);
617         if (r < 0)
618                 return r;
619
620         if (!strv_isempty(j->old_etags)) {
621                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
622
623                 cc = strv_join(j->old_etags, ", ");
624                 if (!cc)
625                         return -ENOMEM;
626
627                 hdr = strappend("If-None-Match: ", cc);
628                 if (!hdr)
629                         return -ENOMEM;
630
631                 j->request_header = curl_slist_new(hdr, NULL);
632                 if (!j->request_header)
633                         return -ENOMEM;
634
635                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
636                         return -EIO;
637         }
638
639         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
640                 return -EIO;
641
642         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
643                 return -EIO;
644
645         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
646                 return -EIO;
647
648         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
649                 return -EIO;
650
651         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
652                 return -EIO;
653
654         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
655                 return -EIO;
656
657         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
658                 return -EIO;
659
660         r = curl_glue_add(j->glue, j->curl);
661         if (r < 0)
662                 return r;
663
664         j->state = IMPORT_JOB_ANALYZING;
665
666         return 0;
667 }