chiark / gitweb /
import: add support for pulling raw tar balls as containers
[elogind.git] / src / import / import-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "import-job.h"
26
27 ImportJob* import_job_unref(ImportJob *j) {
28         if (!j)
29                 return NULL;
30
31         curl_glue_remove_and_free(j->glue, j->curl);
32         curl_slist_free_all(j->request_header);
33
34         safe_close(j->disk_fd);
35
36         if (j->compressed == IMPORT_JOB_XZ)
37                 lzma_end(&j->xz);
38         else if (j->compressed == IMPORT_JOB_GZIP)
39                 inflateEnd(&j->gzip);
40
41         free(j->url);
42         free(j->etag);
43         strv_free(j->old_etags);
44         free(j->payload);
45
46         free(j);
47
48         return NULL;
49 }
50
51 DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
52
53 static void import_job_finish(ImportJob *j, int ret) {
54         assert(j);
55
56         if (j->state == IMPORT_JOB_DONE ||
57             j->state == IMPORT_JOB_FAILED)
58                 return;
59
60         if (ret == 0)
61                 j->state = IMPORT_JOB_DONE;
62         else {
63                 j->state = IMPORT_JOB_FAILED;
64                 j->error = ret;
65         }
66
67         if (j->on_finished)
68                 j->on_finished(j);
69 }
70
71 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
72         ImportJob *j = NULL;
73         CURLcode code;
74         long status;
75         int r;
76
77         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
78                 return;
79
80         if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
81                 return;
82
83         if (result != CURLE_OK) {
84                 log_error("Transfer failed: %s", curl_easy_strerror(result));
85                 r = -EIO;
86                 goto finish;
87         }
88
89         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
90         if (code != CURLE_OK) {
91                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
92                 r = -EIO;
93                 goto finish;
94         } else if (status == 304) {
95                 log_info("Image already downloaded. Skipping download.");
96                 r = 0;
97                 goto finish;
98         } else if (status >= 300) {
99                 log_error("HTTP request to %s failed with code %li.", j->url, status);
100                 r = -EIO;
101                 goto finish;
102         } else if (status < 200) {
103                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
104                 r = -EIO;
105                 goto finish;
106         }
107
108         if (j->state != IMPORT_JOB_RUNNING) {
109                 log_error("Premature connection termination.");
110                 r = -EIO;
111                 goto finish;
112         }
113
114         if (j->content_length != (uint64_t) -1 &&
115             j->content_length != j->written_compressed) {
116                 log_error("Download truncated.");
117                 r = -EIO;
118                 goto finish;
119         }
120
121         if (j->disk_fd >= 0 && j->allow_sparse) {
122                 /* Make sure the file size is right, in case the file was
123                  * sparse and we just seeked for the last part */
124
125                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
126                         log_error_errno(errno, "Failed to truncate file: %m");
127                         r = -errno;
128                         goto finish;
129                 }
130
131                 if (j->etag)
132                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
133                 if (j->url)
134                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
135
136                 if (j->mtime != 0) {
137                         struct timespec ut[2];
138
139                         timespec_store(&ut[0], j->mtime);
140                         ut[1] = ut[0];
141                         (void) futimens(j->disk_fd, ut);
142
143                         (void) fd_setcrtime(j->disk_fd, j->mtime);
144                 }
145         }
146
147         r = 0;
148
149 finish:
150         import_job_finish(j, r);
151 }
152
153
154 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
155         ssize_t n;
156
157         assert(j);
158         assert(p);
159         assert(sz > 0);
160         assert(j->disk_fd >= 0);
161
162         if (j->written_uncompressed + sz < j->written_uncompressed) {
163                 log_error("File too large, overflow");
164                 return -EOVERFLOW;
165         }
166
167         if (j->written_uncompressed + sz > j->uncompressed_max) {
168                 log_error("File overly large, refusing");
169                 return -EFBIG;
170         }
171
172         if (j->disk_fd >= 0) {
173
174                 if (j->allow_sparse)
175                         n = sparse_write(j->disk_fd, p, sz, 64);
176                 else
177                         n = write(j->disk_fd, p, sz);
178                 if (n < 0) {
179                         log_error_errno(errno, "Failed to write file: %m");
180                         return -errno;
181                 }
182                 if ((size_t) n < sz) {
183                         log_error("Short write");
184                         return -EIO;
185                 }
186         } else {
187
188                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
189                         return log_oom();
190
191                 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
192                 j->payload_size += sz;
193         }
194
195         j->written_uncompressed += sz;
196
197         return 0;
198 }
199
200 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
201         int r;
202
203         assert(j);
204         assert(p);
205         assert(sz > 0);
206         assert(j->disk_fd >= 0);
207
208         if (j->written_compressed + sz < j->written_compressed) {
209                 log_error("File too large, overflow");
210                 return -EOVERFLOW;
211         }
212
213         if (j->written_compressed + sz > j->compressed_max) {
214                 log_error("File overly large, refusing.");
215                 return -EFBIG;
216         }
217
218         if (j->content_length != (uint64_t) -1 &&
219             j->written_compressed + sz > j->content_length) {
220                 log_error("Content length incorrect.");
221                 return -EFBIG;
222         }
223
224         switch (j->compressed) {
225
226         case IMPORT_JOB_UNCOMPRESSED:
227                 r = import_job_write_uncompressed(j, p, sz);
228                 if (r < 0)
229                         return r;
230
231                 break;
232
233         case IMPORT_JOB_XZ:
234                 j->xz.next_in = p;
235                 j->xz.avail_in = sz;
236
237                 while (j->xz.avail_in > 0) {
238                         uint8_t buffer[16 * 1024];
239                         lzma_ret lzr;
240
241                         j->xz.next_out = buffer;
242                         j->xz.avail_out = sizeof(buffer);
243
244                         lzr = lzma_code(&j->xz, LZMA_RUN);
245                         if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
246                                 log_error("Decompression error.");
247                                 return -EIO;
248                         }
249
250                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
251                         if (r < 0)
252                                 return r;
253                 }
254
255                 break;
256
257         case IMPORT_JOB_GZIP:
258                 j->gzip.next_in = p;
259                 j->gzip.avail_in = sz;
260
261                 while (j->gzip.avail_in > 0) {
262                         uint8_t buffer[16 * 1024];
263
264                         j->gzip.next_out = buffer;
265                         j->gzip.avail_out = sizeof(buffer);
266
267                         r = inflate(&j->gzip, Z_NO_FLUSH);
268                         if (r != Z_OK && r != Z_STREAM_END) {
269                                 log_error("Decompression error.");
270                                 return -EIO;
271                         }
272
273                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
274                         if (r < 0)
275                                 return r;
276                 }
277
278                 break;
279
280         default:
281                 assert_not_reached("Unknown compression");
282         }
283
284         j->written_compressed += sz;
285
286         return 0;
287 }
288
289 static int import_job_open_disk(ImportJob *j) {
290         int r;
291
292         assert(j);
293
294         if (j->on_open_disk) {
295                 r = j->on_open_disk(j);
296                 if (r < 0)
297                         return r;
298         }
299
300         if (j->disk_fd >= 0) {
301                 /* Check if we can do sparse files */
302
303                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
304                         j->allow_sparse = true;
305                 else {
306                         if (errno != ESPIPE)
307                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
308
309                         j->allow_sparse = false;
310                 }
311         }
312
313         return 0;
314 }
315
316 static int import_job_detect_compression(ImportJob *j) {
317         static const uint8_t xz_signature[] = {
318                 0xfd, '7', 'z', 'X', 'Z', 0x00
319         };
320         static const uint8_t gzip_signature[] = {
321                 0x1f, 0x8b
322         };
323
324         _cleanup_free_ uint8_t *stub = NULL;
325         size_t stub_size;
326
327         int r;
328
329         assert(j);
330
331         if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
332                 return 0;
333
334         if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
335                 j->compressed = IMPORT_JOB_XZ;
336         else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
337                 j->compressed = IMPORT_JOB_GZIP;
338         else
339                 j->compressed = IMPORT_JOB_UNCOMPRESSED;
340
341         log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
342         log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
343
344         if (j->compressed == IMPORT_JOB_XZ) {
345                 lzma_ret xzr;
346
347                 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
348                 if (xzr != LZMA_OK) {
349                         log_error("Failed to initialize XZ decoder.");
350                         return -EIO;
351                 }
352         }
353         if (j->compressed == IMPORT_JOB_GZIP) {
354                 r = inflateInit2(&j->gzip, 15+16);
355                 if (r != Z_OK) {
356                         log_error("Failed to initialize gzip decoder.");
357                         return -EIO;
358                 }
359         }
360
361         r = import_job_open_disk(j);
362         if (r < 0)
363                 return r;
364
365         /* Now, take the payload we read so far, and decompress it */
366         stub = j->payload;
367         stub_size = j->payload_size;
368
369         j->payload = NULL;
370         j->payload_size = 0;
371
372         j->state = IMPORT_JOB_RUNNING;
373
374         r = import_job_write_compressed(j, stub, stub_size);
375         if (r < 0)
376                 return r;
377
378         return 0;
379 }
380
381 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
382         ImportJob *j = userdata;
383         size_t sz = size * nmemb;
384         int r;
385
386         assert(contents);
387         assert(j);
388
389         switch (j->state) {
390
391         case IMPORT_JOB_ANALYZING:
392                 /* Let's first check what it actually is */
393
394                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
395                         r = log_oom();
396                         goto fail;
397                 }
398
399                 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
400                 j->payload_size += sz;
401
402                 r = import_job_detect_compression(j);
403                 if (r < 0)
404                         goto fail;
405
406                 break;
407
408         case IMPORT_JOB_RUNNING:
409
410                 r = import_job_write_compressed(j, contents, sz);
411                 if (r < 0)
412                         goto fail;
413
414                 break;
415
416         case IMPORT_JOB_DONE:
417         case IMPORT_JOB_FAILED:
418                 r = -ESTALE;
419                 goto fail;
420
421         default:
422                 assert_not_reached("Impossible state.");
423         }
424
425         return sz;
426
427 fail:
428         import_job_finish(j, r);
429         return 0;
430 }
431
432 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
433         ImportJob *j = userdata;
434         size_t sz = size * nmemb;
435         _cleanup_free_ char *length = NULL, *last_modified = NULL;
436         char *etag;
437         int r;
438
439         assert(contents);
440         assert(j);
441
442         if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
443                 r = -ESTALE;
444                 goto fail;
445         }
446
447         assert(j->state == IMPORT_JOB_ANALYZING);
448
449         r = curl_header_strdup(contents, sz, "ETag:", &etag);
450         if (r < 0) {
451                 log_oom();
452                 goto fail;
453         }
454         if (r > 0) {
455                 free(j->etag);
456                 j->etag = etag;
457
458                 if (strv_contains(j->old_etags, j->etag)) {
459                         log_info("Image already downloaded. Skipping download.");
460                         import_job_finish(j, 0);
461                         return sz;
462                 }
463
464                 return sz;
465         }
466
467         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
468         if (r < 0) {
469                 log_oom();
470                 goto fail;
471         }
472         if (r > 0) {
473                 (void) safe_atou64(length, &j->content_length);
474
475                 if (j->content_length != (uint64_t) -1) {
476                         char bytes[FORMAT_BYTES_MAX];
477
478                         if (j->content_length > j->compressed_max) {
479                                 log_error("Content too large.");
480                                 r = -EFBIG;
481                                 goto fail;
482                         }
483
484                         log_info("Downloading %s.", format_bytes(bytes, sizeof(bytes), j->content_length));
485                 }
486
487                 return sz;
488         }
489
490         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
491         if (r < 0) {
492                 log_oom();
493                 goto fail;
494         }
495         if (r > 0) {
496                 (void) curl_parse_http_time(last_modified, &j->mtime);
497                 return sz;
498         }
499
500         return sz;
501
502 fail:
503         import_job_finish(j, r);
504         return 0;
505 }
506
507 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
508         ImportJob *j = userdata;
509         unsigned percent;
510         usec_t n;
511
512         assert(j);
513
514         if (dltotal <= 0)
515                 return 0;
516
517         percent = ((100 * dlnow) / dltotal);
518         n = now(CLOCK_MONOTONIC);
519
520         if (n > j->last_status_usec + USEC_PER_SEC &&
521             percent != j->progress_percent) {
522                 char buf[FORMAT_TIMESPAN_MAX];
523
524                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
525                         usec_t left, done;
526
527                         done = n - j->start_usec;
528                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
529
530                         log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC));
531                 } else
532                         log_info("Got %u%% of %s.", percent, j->url);
533
534                 j->progress_percent = percent;
535                 j->last_status_usec = n;
536         }
537
538         return 0;
539 }
540
541 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
542         _cleanup_(import_job_unrefp) ImportJob *j = NULL;
543
544         assert(url);
545         assert(glue);
546         assert(ret);
547
548         j = new0(ImportJob, 1);
549         if (!j)
550                 return -ENOMEM;
551
552         j->state = IMPORT_JOB_INIT;
553         j->disk_fd = -1;
554         j->userdata = userdata;
555         j->glue = glue;
556         j->content_length = (uint64_t) -1;
557         j->start_usec = now(CLOCK_MONOTONIC);
558         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
559
560         j->url = strdup(url);
561         if (!j->url)
562                 return -ENOMEM;
563
564         *ret = j;
565         j = NULL;
566
567         return 0;
568 }
569
570 int import_job_begin(ImportJob *j) {
571         int r;
572
573         assert(j);
574
575         if (j->state != IMPORT_JOB_INIT)
576                 return -EBUSY;
577
578         r = curl_glue_make(&j->curl, j->url, j);
579         if (r < 0)
580                 return r;
581
582         if (!strv_isempty(j->old_etags)) {
583                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
584
585                 cc = strv_join(j->old_etags, ", ");
586                 if (!cc)
587                         return -ENOMEM;
588
589                 hdr = strappend("If-None-Match: ", cc);
590                 if (!hdr)
591                         return -ENOMEM;
592
593                 j->request_header = curl_slist_new(hdr, NULL);
594                 if (!j->request_header)
595                         return -ENOMEM;
596
597                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
598                         return -EIO;
599         }
600
601         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
602                 return -EIO;
603
604         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
605                 return -EIO;
606
607         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
608                 return -EIO;
609
610         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
611                 return -EIO;
612
613         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
614                 return -EIO;
615
616         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
617                 return -EIO;
618
619         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
620                 return -EIO;
621
622         r = curl_glue_add(j->glue, j->curl);
623         if (r < 0)
624                 return r;
625
626         j->state = IMPORT_JOB_ANALYZING;
627
628         return 0;
629 }