chiark / gitweb /
b5237449c95ed8ea255ab9a5a003b4d7f39c9681
[elogind.git] / src / import / pull-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "machine-pool.h"
26 #include "pull-job.h"
27
28 /* Grow the /var/lib/machines directory after each 10MiB written */
29 #define PULL_GROW_INTERVAL_BYTES (UINT64_C(10) * UINT64_C(1024) * UINT64_C(1024))
30
31 PullJob* pull_job_unref(PullJob *j) {
32         if (!j)
33                 return NULL;
34
35         curl_glue_remove_and_free(j->glue, j->curl);
36         curl_slist_free_all(j->request_header);
37
38         safe_close(j->disk_fd);
39
40         import_compress_free(&j->compress);
41
42         if (j->checksum_context)
43                 gcry_md_close(j->checksum_context);
44
45         free(j->url);
46         free(j->etag);
47         strv_free(j->old_etags);
48         free(j->payload);
49         free(j->checksum);
50
51         free(j);
52
53         return NULL;
54 }
55
56 static void pull_job_finish(PullJob *j, int ret) {
57         assert(j);
58
59         if (j->state == PULL_JOB_DONE ||
60             j->state == PULL_JOB_FAILED)
61                 return;
62
63         if (ret == 0) {
64                 j->state = PULL_JOB_DONE;
65                 j->progress_percent = 100;
66                 log_info("Download of %s complete.", j->url);
67         } else {
68                 j->state = PULL_JOB_FAILED;
69                 j->error = ret;
70         }
71
72         if (j->on_finished)
73                 j->on_finished(j);
74 }
75
76 void pull_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
77         PullJob *j = NULL;
78         CURLcode code;
79         long status;
80         int r;
81
82         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
83                 return;
84
85         if (!j || j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED)
86                 return;
87
88         if (result != CURLE_OK) {
89                 log_error("Transfer failed: %s", curl_easy_strerror(result));
90                 r = -EIO;
91                 goto finish;
92         }
93
94         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
95         if (code != CURLE_OK) {
96                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
97                 r = -EIO;
98                 goto finish;
99         } else if (status == 304) {
100                 log_info("Image already downloaded. Skipping download.");
101                 j->etag_exists = true;
102                 r = 0;
103                 goto finish;
104         } else if (status >= 300) {
105                 log_error("HTTP request to %s failed with code %li.", j->url, status);
106                 r = -EIO;
107                 goto finish;
108         } else if (status < 200) {
109                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
110                 r = -EIO;
111                 goto finish;
112         }
113
114         if (j->state != PULL_JOB_RUNNING) {
115                 log_error("Premature connection termination.");
116                 r = -EIO;
117                 goto finish;
118         }
119
120         if (j->content_length != (uint64_t) -1 &&
121             j->content_length != j->written_compressed) {
122                 log_error("Download truncated.");
123                 r = -EIO;
124                 goto finish;
125         }
126
127         if (j->checksum_context) {
128                 uint8_t *k;
129
130                 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
131                 if (!k) {
132                         log_error("Failed to get checksum.");
133                         r = -EIO;
134                         goto finish;
135                 }
136
137                 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
138                 if (!j->checksum) {
139                         r = log_oom();
140                         goto finish;
141                 }
142
143                 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
144         }
145
146         if (j->disk_fd >= 0 && j->allow_sparse) {
147                 /* Make sure the file size is right, in case the file was
148                  * sparse and we just seeked for the last part */
149
150                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
151                         log_error_errno(errno, "Failed to truncate file: %m");
152                         r = -errno;
153                         goto finish;
154                 }
155
156                 if (j->etag)
157                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
158                 if (j->url)
159                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
160
161                 if (j->mtime != 0) {
162                         struct timespec ut[2];
163
164                         timespec_store(&ut[0], j->mtime);
165                         ut[1] = ut[0];
166                         (void) futimens(j->disk_fd, ut);
167
168                         (void) fd_setcrtime(j->disk_fd, j->mtime);
169                 }
170         }
171
172         r = 0;
173
174 finish:
175         pull_job_finish(j, r);
176 }
177
178 static int pull_job_write_uncompressed(const void *p, size_t sz, void *userdata) {
179         PullJob *j = userdata;
180         ssize_t n;
181
182         assert(j);
183         assert(p);
184
185         if (sz <= 0)
186                 return 0;
187
188         if (j->written_uncompressed + sz < j->written_uncompressed) {
189                 log_error("File too large, overflow");
190                 return -EOVERFLOW;
191         }
192
193         if (j->written_uncompressed + sz > j->uncompressed_max) {
194                 log_error("File overly large, refusing");
195                 return -EFBIG;
196         }
197
198         if (j->disk_fd >= 0) {
199
200                 if (j->grow_machine_directory && j->written_since_last_grow >= PULL_GROW_INTERVAL_BYTES) {
201                         j->written_since_last_grow = 0;
202                         grow_machine_directory();
203                 }
204
205                 if (j->allow_sparse)
206                         n = sparse_write(j->disk_fd, p, sz, 64);
207                 else
208                         n = write(j->disk_fd, p, sz);
209                 if (n < 0) {
210                         log_error_errno(errno, "Failed to write file: %m");
211                         return -errno;
212                 }
213                 if ((size_t) n < sz) {
214                         log_error("Short write");
215                         return -EIO;
216                 }
217         } else {
218
219                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
220                         return log_oom();
221
222                 memcpy(j->payload + j->payload_size, p, sz);
223                 j->payload_size += sz;
224         }
225
226         j->written_uncompressed += sz;
227         j->written_since_last_grow += sz;
228
229         return 0;
230 }
231
232 static int pull_job_write_compressed(PullJob *j, void *p, size_t sz) {
233         int r;
234
235         assert(j);
236         assert(p);
237
238         if (sz <= 0)
239                 return 0;
240
241         if (j->written_compressed + sz < j->written_compressed) {
242                 log_error("File too large, overflow");
243                 return -EOVERFLOW;
244         }
245
246         if (j->written_compressed + sz > j->compressed_max) {
247                 log_error("File overly large, refusing.");
248                 return -EFBIG;
249         }
250
251         if (j->content_length != (uint64_t) -1 &&
252             j->written_compressed + sz > j->content_length) {
253                 log_error("Content length incorrect.");
254                 return -EFBIG;
255         }
256
257         if (j->checksum_context)
258                 gcry_md_write(j->checksum_context, p, sz);
259
260         r = import_uncompress(&j->compress, p, sz, pull_job_write_uncompressed, j);
261         if (r < 0)
262                 return r;
263
264         j->written_compressed += sz;
265
266         return 0;
267 }
268
269 static int pull_job_open_disk(PullJob *j) {
270         int r;
271
272         assert(j);
273
274         if (j->on_open_disk) {
275                 r = j->on_open_disk(j);
276                 if (r < 0)
277                         return r;
278         }
279
280         if (j->disk_fd >= 0) {
281                 /* Check if we can do sparse files */
282
283                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
284                         j->allow_sparse = true;
285                 else {
286                         if (errno != ESPIPE)
287                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
288
289                         j->allow_sparse = false;
290                 }
291         }
292
293         if (j->calc_checksum) {
294                 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
295                         log_error("Failed to initialize hash context.");
296                         return -EIO;
297                 }
298         }
299
300         return 0;
301 }
302
303 static int pull_job_detect_compression(PullJob *j) {
304         _cleanup_free_ uint8_t *stub = NULL;
305         size_t stub_size;
306
307         int r;
308
309         assert(j);
310
311         r = import_uncompress_detect(&j->compress, j->payload, j->payload_size);
312         if (r < 0)
313                 return log_error_errno(r, "Failed to initialize compressor: %m");
314         if (r == 0)
315                 return 0;
316
317         log_debug("Stream is compressed: %s", import_compress_type_to_string(j->compress.type));
318
319         r = pull_job_open_disk(j);
320         if (r < 0)
321                 return r;
322
323         /* Now, take the payload we read so far, and decompress it */
324         stub = j->payload;
325         stub_size = j->payload_size;
326
327         j->payload = NULL;
328         j->payload_size = 0;
329         j->payload_allocated = 0;
330
331         j->state = PULL_JOB_RUNNING;
332
333         r = pull_job_write_compressed(j, stub, stub_size);
334         if (r < 0)
335                 return r;
336
337         return 0;
338 }
339
340 static size_t pull_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
341         PullJob *j = userdata;
342         size_t sz = size * nmemb;
343         int r;
344
345         assert(contents);
346         assert(j);
347
348         switch (j->state) {
349
350         case PULL_JOB_ANALYZING:
351                 /* Let's first check what it actually is */
352
353                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
354                         r = log_oom();
355                         goto fail;
356                 }
357
358                 memcpy(j->payload + j->payload_size, contents, sz);
359                 j->payload_size += sz;
360
361                 r = pull_job_detect_compression(j);
362                 if (r < 0)
363                         goto fail;
364
365                 break;
366
367         case PULL_JOB_RUNNING:
368
369                 r = pull_job_write_compressed(j, contents, sz);
370                 if (r < 0)
371                         goto fail;
372
373                 break;
374
375         case PULL_JOB_DONE:
376         case PULL_JOB_FAILED:
377                 r = -ESTALE;
378                 goto fail;
379
380         default:
381                 assert_not_reached("Impossible state.");
382         }
383
384         return sz;
385
386 fail:
387         pull_job_finish(j, r);
388         return 0;
389 }
390
391 static size_t pull_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
392         PullJob *j = userdata;
393         size_t sz = size * nmemb;
394         _cleanup_free_ char *length = NULL, *last_modified = NULL;
395         char *etag;
396         int r;
397
398         assert(contents);
399         assert(j);
400
401         if (j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED) {
402                 r = -ESTALE;
403                 goto fail;
404         }
405
406         assert(j->state == PULL_JOB_ANALYZING);
407
408         r = curl_header_strdup(contents, sz, "ETag:", &etag);
409         if (r < 0) {
410                 log_oom();
411                 goto fail;
412         }
413         if (r > 0) {
414                 free(j->etag);
415                 j->etag = etag;
416
417                 if (strv_contains(j->old_etags, j->etag)) {
418                         log_info("Image already downloaded. Skipping download.");
419                         j->etag_exists = true;
420                         pull_job_finish(j, 0);
421                         return sz;
422                 }
423
424                 return sz;
425         }
426
427         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
428         if (r < 0) {
429                 log_oom();
430                 goto fail;
431         }
432         if (r > 0) {
433                 (void) safe_atou64(length, &j->content_length);
434
435                 if (j->content_length != (uint64_t) -1) {
436                         char bytes[FORMAT_BYTES_MAX];
437
438                         if (j->content_length > j->compressed_max) {
439                                 log_error("Content too large.");
440                                 r = -EFBIG;
441                                 goto fail;
442                         }
443
444                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
445                 }
446
447                 return sz;
448         }
449
450         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
451         if (r < 0) {
452                 log_oom();
453                 goto fail;
454         }
455         if (r > 0) {
456                 (void) curl_parse_http_time(last_modified, &j->mtime);
457                 return sz;
458         }
459
460         if (j->on_header) {
461                 r = j->on_header(j, contents, sz);
462                 if (r < 0)
463                         goto fail;
464         }
465
466         return sz;
467
468 fail:
469         pull_job_finish(j, r);
470         return 0;
471 }
472
473 static int pull_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
474         PullJob *j = userdata;
475         unsigned percent;
476         usec_t n;
477
478         assert(j);
479
480         if (dltotal <= 0)
481                 return 0;
482
483         percent = ((100 * dlnow) / dltotal);
484         n = now(CLOCK_MONOTONIC);
485
486         if (n > j->last_status_usec + USEC_PER_SEC &&
487             percent != j->progress_percent &&
488             dlnow < dltotal) {
489                 char buf[FORMAT_TIMESPAN_MAX];
490
491                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
492                         char y[FORMAT_BYTES_MAX];
493                         usec_t left, done;
494
495                         done = n - j->start_usec;
496                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
497
498                         log_info("Got %u%% of %s. %s left at %s/s.",
499                                  percent,
500                                  j->url,
501                                  format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
502                                  format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
503                 } else
504                         log_info("Got %u%% of %s.", percent, j->url);
505
506                 j->progress_percent = percent;
507                 j->last_status_usec = n;
508
509                 if (j->on_progress)
510                         j->on_progress(j);
511         }
512
513         return 0;
514 }
515
516 int pull_job_new(PullJob **ret, const char *url, CurlGlue *glue, void *userdata) {
517         _cleanup_(pull_job_unrefp) PullJob *j = NULL;
518
519         assert(url);
520         assert(glue);
521         assert(ret);
522
523         j = new0(PullJob, 1);
524         if (!j)
525                 return -ENOMEM;
526
527         j->state = PULL_JOB_INIT;
528         j->disk_fd = -1;
529         j->userdata = userdata;
530         j->glue = glue;
531         j->content_length = (uint64_t) -1;
532         j->start_usec = now(CLOCK_MONOTONIC);
533         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
534
535         j->url = strdup(url);
536         if (!j->url)
537                 return -ENOMEM;
538
539         *ret = j;
540         j = NULL;
541
542         return 0;
543 }
544
545 int pull_job_begin(PullJob *j) {
546         int r;
547
548         assert(j);
549
550         if (j->state != PULL_JOB_INIT)
551                 return -EBUSY;
552
553         if (j->grow_machine_directory)
554                 grow_machine_directory();
555
556         r = curl_glue_make(&j->curl, j->url, j);
557         if (r < 0)
558                 return r;
559
560         if (!strv_isempty(j->old_etags)) {
561                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
562
563                 cc = strv_join(j->old_etags, ", ");
564                 if (!cc)
565                         return -ENOMEM;
566
567                 hdr = strappend("If-None-Match: ", cc);
568                 if (!hdr)
569                         return -ENOMEM;
570
571                 if (!j->request_header) {
572                         j->request_header = curl_slist_new(hdr, NULL);
573                         if (!j->request_header)
574                                 return -ENOMEM;
575                 } else {
576                         struct curl_slist *l;
577
578                         l = curl_slist_append(j->request_header, hdr);
579                         if (!l)
580                                 return -ENOMEM;
581
582                         j->request_header = l;
583                 }
584         }
585
586         if (j->request_header) {
587                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
588                         return -EIO;
589         }
590
591         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, pull_job_write_callback) != CURLE_OK)
592                 return -EIO;
593
594         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
595                 return -EIO;
596
597         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, pull_job_header_callback) != CURLE_OK)
598                 return -EIO;
599
600         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
601                 return -EIO;
602
603         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, pull_job_progress_callback) != CURLE_OK)
604                 return -EIO;
605
606         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
607                 return -EIO;
608
609         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
610                 return -EIO;
611
612         r = curl_glue_add(j->glue, j->curl);
613         if (r < 0)
614                 return r;
615
616         j->state = PULL_JOB_ANALYZING;
617
618         return 0;
619 }