chiark / gitweb /
logind: remove per-user runtime dir again if setup fails
[elogind.git] / src / import / import-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "import-job.h"
26
27 ImportJob* import_job_unref(ImportJob *j) {
28         if (!j)
29                 return NULL;
30
31         curl_glue_remove_and_free(j->glue, j->curl);
32         curl_slist_free_all(j->request_header);
33
34         safe_close(j->disk_fd);
35
36         if (j->compressed == IMPORT_JOB_XZ)
37                 lzma_end(&j->xz);
38         else if (j->compressed == IMPORT_JOB_GZIP)
39                 inflateEnd(&j->gzip);
40         else if (j->compressed == IMPORT_JOB_BZIP2)
41                 BZ2_bzDecompressEnd(&j->bzip2);
42
43         if (j->checksum_context)
44                 gcry_md_close(j->checksum_context);
45
46         free(j->url);
47         free(j->etag);
48         strv_free(j->old_etags);
49         free(j->payload);
50         free(j->checksum);
51
52         free(j);
53
54         return NULL;
55 }
56
57 static void import_job_finish(ImportJob *j, int ret) {
58         assert(j);
59
60         if (j->state == IMPORT_JOB_DONE ||
61             j->state == IMPORT_JOB_FAILED)
62                 return;
63
64         if (ret == 0) {
65                 j->state = IMPORT_JOB_DONE;
66                 j->progress_percent = 100;
67                 log_info("Download of %s complete.", j->url);
68         } else {
69                 j->state = IMPORT_JOB_FAILED;
70                 j->error = ret;
71         }
72
73         if (j->on_finished)
74                 j->on_finished(j);
75 }
76
77 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
78         ImportJob *j = NULL;
79         CURLcode code;
80         long status;
81         int r;
82
83         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
84                 return;
85
86         if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
87                 return;
88
89         if (result != CURLE_OK) {
90                 log_error("Transfer failed: %s", curl_easy_strerror(result));
91                 r = -EIO;
92                 goto finish;
93         }
94
95         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
96         if (code != CURLE_OK) {
97                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
98                 r = -EIO;
99                 goto finish;
100         } else if (status == 304) {
101                 log_info("Image already downloaded. Skipping download.");
102                 j->etag_exists = true;
103                 r = 0;
104                 goto finish;
105         } else if (status >= 300) {
106                 log_error("HTTP request to %s failed with code %li.", j->url, status);
107                 r = -EIO;
108                 goto finish;
109         } else if (status < 200) {
110                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
111                 r = -EIO;
112                 goto finish;
113         }
114
115         if (j->state != IMPORT_JOB_RUNNING) {
116                 log_error("Premature connection termination.");
117                 r = -EIO;
118                 goto finish;
119         }
120
121         if (j->content_length != (uint64_t) -1 &&
122             j->content_length != j->written_compressed) {
123                 log_error("Download truncated.");
124                 r = -EIO;
125                 goto finish;
126         }
127
128         if (j->checksum_context) {
129                 uint8_t *k;
130
131                 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
132                 if (!k) {
133                         log_error("Failed to get checksum.");
134                         r = -EIO;
135                         goto finish;
136                 }
137
138                 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
139                 if (!j->checksum) {
140                         r = log_oom();
141                         goto finish;
142                 }
143
144                 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
145         }
146
147         if (j->disk_fd >= 0 && j->allow_sparse) {
148                 /* Make sure the file size is right, in case the file was
149                  * sparse and we just seeked for the last part */
150
151                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
152                         log_error_errno(errno, "Failed to truncate file: %m");
153                         r = -errno;
154                         goto finish;
155                 }
156
157                 if (j->etag)
158                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
159                 if (j->url)
160                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
161
162                 if (j->mtime != 0) {
163                         struct timespec ut[2];
164
165                         timespec_store(&ut[0], j->mtime);
166                         ut[1] = ut[0];
167                         (void) futimens(j->disk_fd, ut);
168
169                         (void) fd_setcrtime(j->disk_fd, j->mtime);
170                 }
171         }
172
173         r = 0;
174
175 finish:
176         import_job_finish(j, r);
177 }
178
179 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
180         ssize_t n;
181
182         assert(j);
183         assert(p);
184
185         if (sz <= 0)
186                 return 0;
187
188         if (j->written_uncompressed + sz < j->written_uncompressed) {
189                 log_error("File too large, overflow");
190                 return -EOVERFLOW;
191         }
192
193         if (j->written_uncompressed + sz > j->uncompressed_max) {
194                 log_error("File overly large, refusing");
195                 return -EFBIG;
196         }
197
198         if (j->disk_fd >= 0) {
199
200                 if (j->allow_sparse)
201                         n = sparse_write(j->disk_fd, p, sz, 64);
202                 else
203                         n = write(j->disk_fd, p, sz);
204                 if (n < 0) {
205                         log_error_errno(errno, "Failed to write file: %m");
206                         return -errno;
207                 }
208                 if ((size_t) n < sz) {
209                         log_error("Short write");
210                         return -EIO;
211                 }
212         } else {
213
214                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
215                         return log_oom();
216
217                 memcpy(j->payload + j->payload_size, p, sz);
218                 j->payload_size += sz;
219         }
220
221         j->written_uncompressed += sz;
222
223         return 0;
224 }
225
226 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
227         int r;
228
229         assert(j);
230         assert(p);
231
232         if (sz <= 0)
233                 return 0;
234
235         if (j->written_compressed + sz < j->written_compressed) {
236                 log_error("File too large, overflow");
237                 return -EOVERFLOW;
238         }
239
240         if (j->written_compressed + sz > j->compressed_max) {
241                 log_error("File overly large, refusing.");
242                 return -EFBIG;
243         }
244
245         if (j->content_length != (uint64_t) -1 &&
246             j->written_compressed + sz > j->content_length) {
247                 log_error("Content length incorrect.");
248                 return -EFBIG;
249         }
250
251         if (j->checksum_context)
252                 gcry_md_write(j->checksum_context, p, sz);
253
254         switch (j->compressed) {
255
256         case IMPORT_JOB_UNCOMPRESSED:
257                 r = import_job_write_uncompressed(j, p, sz);
258                 if (r < 0)
259                         return r;
260
261                 break;
262
263         case IMPORT_JOB_XZ:
264                 j->xz.next_in = p;
265                 j->xz.avail_in = sz;
266
267                 while (j->xz.avail_in > 0) {
268                         uint8_t buffer[16 * 1024];
269                         lzma_ret lzr;
270
271                         j->xz.next_out = buffer;
272                         j->xz.avail_out = sizeof(buffer);
273
274                         lzr = lzma_code(&j->xz, LZMA_RUN);
275                         if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
276                                 log_error("Decompression error.");
277                                 return -EIO;
278                         }
279
280                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
281                         if (r < 0)
282                                 return r;
283                 }
284
285                 break;
286
287         case IMPORT_JOB_GZIP:
288                 j->gzip.next_in = p;
289                 j->gzip.avail_in = sz;
290
291                 while (j->gzip.avail_in > 0) {
292                         uint8_t buffer[16 * 1024];
293
294                         j->gzip.next_out = buffer;
295                         j->gzip.avail_out = sizeof(buffer);
296
297                         r = inflate(&j->gzip, Z_NO_FLUSH);
298                         if (r != Z_OK && r != Z_STREAM_END) {
299                                 log_error("Decompression error.");
300                                 return -EIO;
301                         }
302
303                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
304                         if (r < 0)
305                                 return r;
306                 }
307
308                 break;
309
310         case IMPORT_JOB_BZIP2:
311                 j->bzip2.next_in = p;
312                 j->bzip2.avail_in = sz;
313
314                 while (j->bzip2.avail_in > 0) {
315                         uint8_t buffer[16 * 1024];
316
317                         j->bzip2.next_out = (char*) buffer;
318                         j->bzip2.avail_out = sizeof(buffer);
319
320                         r = BZ2_bzDecompress(&j->bzip2);
321                         if (r != BZ_OK && r != BZ_STREAM_END) {
322                                 log_error("Decompression error.");
323                                 return -EIO;
324                         }
325
326                         r = import_job_write_uncompressed(j,  buffer, sizeof(buffer) - j->bzip2.avail_out);
327                         if (r < 0)
328                                 return r;
329                 }
330
331                 break;
332
333         default:
334                 assert_not_reached("Unknown compression");
335         }
336
337         j->written_compressed += sz;
338
339         return 0;
340 }
341
342 static int import_job_open_disk(ImportJob *j) {
343         int r;
344
345         assert(j);
346
347         if (j->on_open_disk) {
348                 r = j->on_open_disk(j);
349                 if (r < 0)
350                         return r;
351         }
352
353         if (j->disk_fd >= 0) {
354                 /* Check if we can do sparse files */
355
356                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
357                         j->allow_sparse = true;
358                 else {
359                         if (errno != ESPIPE)
360                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
361
362                         j->allow_sparse = false;
363                 }
364         }
365
366         if (j->calc_checksum) {
367                 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
368                         log_error("Failed to initialize hash context.");
369                         return -EIO;
370                 }
371         }
372
373         return 0;
374 }
375
376 static int import_job_detect_compression(ImportJob *j) {
377         static const uint8_t xz_signature[] = {
378                 0xfd, '7', 'z', 'X', 'Z', 0x00
379         };
380         static const uint8_t gzip_signature[] = {
381                 0x1f, 0x8b
382         };
383         static const uint8_t bzip2_signature[] = {
384                 'B', 'Z', 'h'
385         };
386
387         _cleanup_free_ uint8_t *stub = NULL;
388         size_t stub_size;
389
390         int r;
391
392         assert(j);
393
394         if (j->payload_size < MAX3(sizeof(xz_signature),
395                                    sizeof(gzip_signature),
396                                    sizeof(bzip2_signature)))
397                 return 0;
398
399         if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
400                 j->compressed = IMPORT_JOB_XZ;
401         else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
402                 j->compressed = IMPORT_JOB_GZIP;
403         else if (memcmp(j->payload, bzip2_signature, sizeof(bzip2_signature)) == 0)
404                 j->compressed = IMPORT_JOB_BZIP2;
405         else
406                 j->compressed = IMPORT_JOB_UNCOMPRESSED;
407
408         log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
409         log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
410         log_debug("Stream is BZIP2 compressed: %s", yes_no(j->compressed == IMPORT_JOB_BZIP2));
411
412         if (j->compressed == IMPORT_JOB_XZ) {
413                 lzma_ret xzr;
414
415                 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
416                 if (xzr != LZMA_OK) {
417                         log_error("Failed to initialize XZ decoder.");
418                         return -EIO;
419                 }
420         }
421         if (j->compressed == IMPORT_JOB_GZIP) {
422                 r = inflateInit2(&j->gzip, 15+16);
423                 if (r != Z_OK) {
424                         log_error("Failed to initialize gzip decoder.");
425                         return -EIO;
426                 }
427         }
428         if (j->compressed == IMPORT_JOB_BZIP2) {
429                 r = BZ2_bzDecompressInit(&j->bzip2, 0, 0);
430                 if (r != BZ_OK) {
431                         log_error("Failed to initialize bzip2 decoder.");
432                         return -EIO;
433                 }
434         }
435
436         r = import_job_open_disk(j);
437         if (r < 0)
438                 return r;
439
440         /* Now, take the payload we read so far, and decompress it */
441         stub = j->payload;
442         stub_size = j->payload_size;
443
444         j->payload = NULL;
445         j->payload_size = 0;
446         j->payload_allocated = 0;
447
448         j->state = IMPORT_JOB_RUNNING;
449
450         r = import_job_write_compressed(j, stub, stub_size);
451         if (r < 0)
452                 return r;
453
454         return 0;
455 }
456
457 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
458         ImportJob *j = userdata;
459         size_t sz = size * nmemb;
460         int r;
461
462         assert(contents);
463         assert(j);
464
465         switch (j->state) {
466
467         case IMPORT_JOB_ANALYZING:
468                 /* Let's first check what it actually is */
469
470                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
471                         r = log_oom();
472                         goto fail;
473                 }
474
475                 memcpy(j->payload + j->payload_size, contents, sz);
476                 j->payload_size += sz;
477
478                 r = import_job_detect_compression(j);
479                 if (r < 0)
480                         goto fail;
481
482                 break;
483
484         case IMPORT_JOB_RUNNING:
485
486                 r = import_job_write_compressed(j, contents, sz);
487                 if (r < 0)
488                         goto fail;
489
490                 break;
491
492         case IMPORT_JOB_DONE:
493         case IMPORT_JOB_FAILED:
494                 r = -ESTALE;
495                 goto fail;
496
497         default:
498                 assert_not_reached("Impossible state.");
499         }
500
501         return sz;
502
503 fail:
504         import_job_finish(j, r);
505         return 0;
506 }
507
508 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
509         ImportJob *j = userdata;
510         size_t sz = size * nmemb;
511         _cleanup_free_ char *length = NULL, *last_modified = NULL;
512         char *etag;
513         int r;
514
515         assert(contents);
516         assert(j);
517
518         if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
519                 r = -ESTALE;
520                 goto fail;
521         }
522
523         assert(j->state == IMPORT_JOB_ANALYZING);
524
525         r = curl_header_strdup(contents, sz, "ETag:", &etag);
526         if (r < 0) {
527                 log_oom();
528                 goto fail;
529         }
530         if (r > 0) {
531                 free(j->etag);
532                 j->etag = etag;
533
534                 if (strv_contains(j->old_etags, j->etag)) {
535                         log_info("Image already downloaded. Skipping download.");
536                         j->etag_exists = true;
537                         import_job_finish(j, 0);
538                         return sz;
539                 }
540
541                 return sz;
542         }
543
544         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
545         if (r < 0) {
546                 log_oom();
547                 goto fail;
548         }
549         if (r > 0) {
550                 (void) safe_atou64(length, &j->content_length);
551
552                 if (j->content_length != (uint64_t) -1) {
553                         char bytes[FORMAT_BYTES_MAX];
554
555                         if (j->content_length > j->compressed_max) {
556                                 log_error("Content too large.");
557                                 r = -EFBIG;
558                                 goto fail;
559                         }
560
561                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
562                 }
563
564                 return sz;
565         }
566
567         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
568         if (r < 0) {
569                 log_oom();
570                 goto fail;
571         }
572         if (r > 0) {
573                 (void) curl_parse_http_time(last_modified, &j->mtime);
574                 return sz;
575         }
576
577         if (j->on_header) {
578                 r = j->on_header(j, contents, sz);
579                 if (r < 0)
580                         goto fail;
581         }
582
583         return sz;
584
585 fail:
586         import_job_finish(j, r);
587         return 0;
588 }
589
590 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
591         ImportJob *j = userdata;
592         unsigned percent;
593         usec_t n;
594
595         assert(j);
596
597         if (dltotal <= 0)
598                 return 0;
599
600         percent = ((100 * dlnow) / dltotal);
601         n = now(CLOCK_MONOTONIC);
602
603         if (n > j->last_status_usec + USEC_PER_SEC &&
604             percent != j->progress_percent &&
605             dlnow < dltotal) {
606                 char buf[FORMAT_TIMESPAN_MAX];
607
608                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
609                         char y[FORMAT_BYTES_MAX];
610                         usec_t left, done;
611
612                         done = n - j->start_usec;
613                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
614
615                         log_info("Got %u%% of %s. %s left at %s/s.",
616                                  percent,
617                                  j->url,
618                                  format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
619                                  format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
620                 } else
621                         log_info("Got %u%% of %s.", percent, j->url);
622
623                 j->progress_percent = percent;
624                 j->last_status_usec = n;
625
626                 if (j->on_progress)
627                         j->on_progress(j);
628         }
629
630         return 0;
631 }
632
633 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
634         _cleanup_(import_job_unrefp) ImportJob *j = NULL;
635
636         assert(url);
637         assert(glue);
638         assert(ret);
639
640         j = new0(ImportJob, 1);
641         if (!j)
642                 return -ENOMEM;
643
644         j->state = IMPORT_JOB_INIT;
645         j->disk_fd = -1;
646         j->userdata = userdata;
647         j->glue = glue;
648         j->content_length = (uint64_t) -1;
649         j->start_usec = now(CLOCK_MONOTONIC);
650         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
651
652         j->url = strdup(url);
653         if (!j->url)
654                 return -ENOMEM;
655
656         *ret = j;
657         j = NULL;
658
659         return 0;
660 }
661
662 int import_job_begin(ImportJob *j) {
663         int r;
664
665         assert(j);
666
667         if (j->state != IMPORT_JOB_INIT)
668                 return -EBUSY;
669
670         r = curl_glue_make(&j->curl, j->url, j);
671         if (r < 0)
672                 return r;
673
674         if (!strv_isempty(j->old_etags)) {
675                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
676
677                 cc = strv_join(j->old_etags, ", ");
678                 if (!cc)
679                         return -ENOMEM;
680
681                 hdr = strappend("If-None-Match: ", cc);
682                 if (!hdr)
683                         return -ENOMEM;
684
685                 if (!j->request_header) {
686                         j->request_header = curl_slist_new(hdr, NULL);
687                         if (!j->request_header)
688                                 return -ENOMEM;
689                 } else {
690                         struct curl_slist *l;
691
692                         l = curl_slist_append(j->request_header, hdr);
693                         if (!l)
694                                 return -ENOMEM;
695
696                         j->request_header = l;
697                 }
698         }
699
700         if (j->request_header) {
701                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
702                         return -EIO;
703         }
704
705         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
706                 return -EIO;
707
708         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
709                 return -EIO;
710
711         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
712                 return -EIO;
713
714         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
715                 return -EIO;
716
717         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
718                 return -EIO;
719
720         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
721                 return -EIO;
722
723         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
724                 return -EIO;
725
726         r = curl_glue_add(j->glue, j->curl);
727         if (r < 0)
728                 return r;
729
730         j->state = IMPORT_JOB_ANALYZING;
731
732         return 0;
733 }