chiark / gitweb /
ed9af2351ffe4ffe5ab16f8aab808094731b9a71
[elogind.git] / src / import / pull-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "machine-pool.h"
26 #include "pull-job.h"
27
28 PullJob* pull_job_unref(PullJob *j) {
29         if (!j)
30                 return NULL;
31
32         curl_glue_remove_and_free(j->glue, j->curl);
33         curl_slist_free_all(j->request_header);
34
35         safe_close(j->disk_fd);
36
37         import_compress_free(&j->compress);
38
39         if (j->checksum_context)
40                 gcry_md_close(j->checksum_context);
41
42         free(j->url);
43         free(j->etag);
44         strv_free(j->old_etags);
45         free(j->payload);
46         free(j->checksum);
47
48         free(j);
49
50         return NULL;
51 }
52
53 static void pull_job_finish(PullJob *j, int ret) {
54         assert(j);
55
56         if (j->state == PULL_JOB_DONE ||
57             j->state == PULL_JOB_FAILED)
58                 return;
59
60         if (ret == 0) {
61                 j->state = PULL_JOB_DONE;
62                 j->progress_percent = 100;
63                 log_info("Download of %s complete.", j->url);
64         } else {
65                 j->state = PULL_JOB_FAILED;
66                 j->error = ret;
67         }
68
69         if (j->on_finished)
70                 j->on_finished(j);
71 }
72
73 void pull_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
74         PullJob *j = NULL;
75         CURLcode code;
76         long status;
77         int r;
78
79         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
80                 return;
81
82         if (!j || j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED)
83                 return;
84
85         if (result != CURLE_OK) {
86                 log_error("Transfer failed: %s", curl_easy_strerror(result));
87                 r = -EIO;
88                 goto finish;
89         }
90
91         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
92         if (code != CURLE_OK) {
93                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
94                 r = -EIO;
95                 goto finish;
96         } else if (status == 304) {
97                 log_info("Image already downloaded. Skipping download.");
98                 j->etag_exists = true;
99                 r = 0;
100                 goto finish;
101         } else if (status >= 300) {
102                 log_error("HTTP request to %s failed with code %li.", j->url, status);
103                 r = -EIO;
104                 goto finish;
105         } else if (status < 200) {
106                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
107                 r = -EIO;
108                 goto finish;
109         }
110
111         if (j->state != PULL_JOB_RUNNING) {
112                 log_error("Premature connection termination.");
113                 r = -EIO;
114                 goto finish;
115         }
116
117         if (j->content_length != (uint64_t) -1 &&
118             j->content_length != j->written_compressed) {
119                 log_error("Download truncated.");
120                 r = -EIO;
121                 goto finish;
122         }
123
124         if (j->checksum_context) {
125                 uint8_t *k;
126
127                 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
128                 if (!k) {
129                         log_error("Failed to get checksum.");
130                         r = -EIO;
131                         goto finish;
132                 }
133
134                 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
135                 if (!j->checksum) {
136                         r = log_oom();
137                         goto finish;
138                 }
139
140                 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
141         }
142
143         if (j->disk_fd >= 0 && j->allow_sparse) {
144                 /* Make sure the file size is right, in case the file was
145                  * sparse and we just seeked for the last part */
146
147                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
148                         r = log_error_errno(errno, "Failed to truncate file: %m");
149                         goto finish;
150                 }
151
152                 if (j->etag)
153                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
154                 if (j->url)
155                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
156
157                 if (j->mtime != 0) {
158                         struct timespec ut[2];
159
160                         timespec_store(&ut[0], j->mtime);
161                         ut[1] = ut[0];
162                         (void) futimens(j->disk_fd, ut);
163
164                         (void) fd_setcrtime(j->disk_fd, j->mtime);
165                 }
166         }
167
168         r = 0;
169
170 finish:
171         pull_job_finish(j, r);
172 }
173
174 static int pull_job_write_uncompressed(const void *p, size_t sz, void *userdata) {
175         PullJob *j = userdata;
176         ssize_t n;
177
178         assert(j);
179         assert(p);
180
181         if (sz <= 0)
182                 return 0;
183
184         if (j->written_uncompressed + sz < j->written_uncompressed) {
185                 log_error("File too large, overflow");
186                 return -EOVERFLOW;
187         }
188
189         if (j->written_uncompressed + sz > j->uncompressed_max) {
190                 log_error("File overly large, refusing");
191                 return -EFBIG;
192         }
193
194         if (j->disk_fd >= 0) {
195
196                 if (j->grow_machine_directory && j->written_since_last_grow >= GROW_INTERVAL_BYTES) {
197                         j->written_since_last_grow = 0;
198                         grow_machine_directory();
199                 }
200
201                 if (j->allow_sparse)
202                         n = sparse_write(j->disk_fd, p, sz, 64);
203                 else
204                         n = write(j->disk_fd, p, sz);
205                 if (n < 0)
206                         return log_error_errno(errno, "Failed to write file: %m");
207                 if ((size_t) n < sz) {
208                         log_error("Short write");
209                         return -EIO;
210                 }
211         } else {
212
213                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
214                         return log_oom();
215
216                 memcpy(j->payload + j->payload_size, p, sz);
217                 j->payload_size += sz;
218         }
219
220         j->written_uncompressed += sz;
221         j->written_since_last_grow += sz;
222
223         return 0;
224 }
225
226 static int pull_job_write_compressed(PullJob *j, void *p, size_t sz) {
227         int r;
228
229         assert(j);
230         assert(p);
231
232         if (sz <= 0)
233                 return 0;
234
235         if (j->written_compressed + sz < j->written_compressed) {
236                 log_error("File too large, overflow");
237                 return -EOVERFLOW;
238         }
239
240         if (j->written_compressed + sz > j->compressed_max) {
241                 log_error("File overly large, refusing.");
242                 return -EFBIG;
243         }
244
245         if (j->content_length != (uint64_t) -1 &&
246             j->written_compressed + sz > j->content_length) {
247                 log_error("Content length incorrect.");
248                 return -EFBIG;
249         }
250
251         if (j->checksum_context)
252                 gcry_md_write(j->checksum_context, p, sz);
253
254         r = import_uncompress(&j->compress, p, sz, pull_job_write_uncompressed, j);
255         if (r < 0)
256                 return r;
257
258         j->written_compressed += sz;
259
260         return 0;
261 }
262
263 static int pull_job_open_disk(PullJob *j) {
264         int r;
265
266         assert(j);
267
268         if (j->on_open_disk) {
269                 r = j->on_open_disk(j);
270                 if (r < 0)
271                         return r;
272         }
273
274         if (j->disk_fd >= 0) {
275                 /* Check if we can do sparse files */
276
277                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
278                         j->allow_sparse = true;
279                 else {
280                         if (errno != ESPIPE)
281                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
282
283                         j->allow_sparse = false;
284                 }
285         }
286
287         if (j->calc_checksum) {
288                 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
289                         log_error("Failed to initialize hash context.");
290                         return -EIO;
291                 }
292         }
293
294         return 0;
295 }
296
297 static int pull_job_detect_compression(PullJob *j) {
298         _cleanup_free_ uint8_t *stub = NULL;
299         size_t stub_size;
300
301         int r;
302
303         assert(j);
304
305         r = import_uncompress_detect(&j->compress, j->payload, j->payload_size);
306         if (r < 0)
307                 return log_error_errno(r, "Failed to initialize compressor: %m");
308         if (r == 0)
309                 return 0;
310
311         log_debug("Stream is compressed: %s", import_compress_type_to_string(j->compress.type));
312
313         r = pull_job_open_disk(j);
314         if (r < 0)
315                 return r;
316
317         /* Now, take the payload we read so far, and decompress it */
318         stub = j->payload;
319         stub_size = j->payload_size;
320
321         j->payload = NULL;
322         j->payload_size = 0;
323         j->payload_allocated = 0;
324
325         j->state = PULL_JOB_RUNNING;
326
327         r = pull_job_write_compressed(j, stub, stub_size);
328         if (r < 0)
329                 return r;
330
331         return 0;
332 }
333
334 static size_t pull_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
335         PullJob *j = userdata;
336         size_t sz = size * nmemb;
337         int r;
338
339         assert(contents);
340         assert(j);
341
342         switch (j->state) {
343
344         case PULL_JOB_ANALYZING:
345                 /* Let's first check what it actually is */
346
347                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
348                         r = log_oom();
349                         goto fail;
350                 }
351
352                 memcpy(j->payload + j->payload_size, contents, sz);
353                 j->payload_size += sz;
354
355                 r = pull_job_detect_compression(j);
356                 if (r < 0)
357                         goto fail;
358
359                 break;
360
361         case PULL_JOB_RUNNING:
362
363                 r = pull_job_write_compressed(j, contents, sz);
364                 if (r < 0)
365                         goto fail;
366
367                 break;
368
369         case PULL_JOB_DONE:
370         case PULL_JOB_FAILED:
371                 r = -ESTALE;
372                 goto fail;
373
374         default:
375                 assert_not_reached("Impossible state.");
376         }
377
378         return sz;
379
380 fail:
381         pull_job_finish(j, r);
382         return 0;
383 }
384
385 static size_t pull_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
386         PullJob *j = userdata;
387         size_t sz = size * nmemb;
388         _cleanup_free_ char *length = NULL, *last_modified = NULL;
389         char *etag;
390         int r;
391
392         assert(contents);
393         assert(j);
394
395         if (j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED) {
396                 r = -ESTALE;
397                 goto fail;
398         }
399
400         assert(j->state == PULL_JOB_ANALYZING);
401
402         r = curl_header_strdup(contents, sz, "ETag:", &etag);
403         if (r < 0) {
404                 log_oom();
405                 goto fail;
406         }
407         if (r > 0) {
408                 free(j->etag);
409                 j->etag = etag;
410
411                 if (strv_contains(j->old_etags, j->etag)) {
412                         log_info("Image already downloaded. Skipping download.");
413                         j->etag_exists = true;
414                         pull_job_finish(j, 0);
415                         return sz;
416                 }
417
418                 return sz;
419         }
420
421         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
422         if (r < 0) {
423                 log_oom();
424                 goto fail;
425         }
426         if (r > 0) {
427                 (void) safe_atou64(length, &j->content_length);
428
429                 if (j->content_length != (uint64_t) -1) {
430                         char bytes[FORMAT_BYTES_MAX];
431
432                         if (j->content_length > j->compressed_max) {
433                                 log_error("Content too large.");
434                                 r = -EFBIG;
435                                 goto fail;
436                         }
437
438                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
439                 }
440
441                 return sz;
442         }
443
444         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
445         if (r < 0) {
446                 log_oom();
447                 goto fail;
448         }
449         if (r > 0) {
450                 (void) curl_parse_http_time(last_modified, &j->mtime);
451                 return sz;
452         }
453
454         if (j->on_header) {
455                 r = j->on_header(j, contents, sz);
456                 if (r < 0)
457                         goto fail;
458         }
459
460         return sz;
461
462 fail:
463         pull_job_finish(j, r);
464         return 0;
465 }
466
467 static int pull_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
468         PullJob *j = userdata;
469         unsigned percent;
470         usec_t n;
471
472         assert(j);
473
474         if (dltotal <= 0)
475                 return 0;
476
477         percent = ((100 * dlnow) / dltotal);
478         n = now(CLOCK_MONOTONIC);
479
480         if (n > j->last_status_usec + USEC_PER_SEC &&
481             percent != j->progress_percent &&
482             dlnow < dltotal) {
483                 char buf[FORMAT_TIMESPAN_MAX];
484
485                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
486                         char y[FORMAT_BYTES_MAX];
487                         usec_t left, done;
488
489                         done = n - j->start_usec;
490                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
491
492                         log_info("Got %u%% of %s. %s left at %s/s.",
493                                  percent,
494                                  j->url,
495                                  format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
496                                  format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
497                 } else
498                         log_info("Got %u%% of %s.", percent, j->url);
499
500                 j->progress_percent = percent;
501                 j->last_status_usec = n;
502
503                 if (j->on_progress)
504                         j->on_progress(j);
505         }
506
507         return 0;
508 }
509
510 int pull_job_new(PullJob **ret, const char *url, CurlGlue *glue, void *userdata) {
511         _cleanup_(pull_job_unrefp) PullJob *j = NULL;
512
513         assert(url);
514         assert(glue);
515         assert(ret);
516
517         j = new0(PullJob, 1);
518         if (!j)
519                 return -ENOMEM;
520
521         j->state = PULL_JOB_INIT;
522         j->disk_fd = -1;
523         j->userdata = userdata;
524         j->glue = glue;
525         j->content_length = (uint64_t) -1;
526         j->start_usec = now(CLOCK_MONOTONIC);
527         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
528
529         j->url = strdup(url);
530         if (!j->url)
531                 return -ENOMEM;
532
533         *ret = j;
534         j = NULL;
535
536         return 0;
537 }
538
539 int pull_job_begin(PullJob *j) {
540         int r;
541
542         assert(j);
543
544         if (j->state != PULL_JOB_INIT)
545                 return -EBUSY;
546
547         if (j->grow_machine_directory)
548                 grow_machine_directory();
549
550         r = curl_glue_make(&j->curl, j->url, j);
551         if (r < 0)
552                 return r;
553
554         if (!strv_isempty(j->old_etags)) {
555                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
556
557                 cc = strv_join(j->old_etags, ", ");
558                 if (!cc)
559                         return -ENOMEM;
560
561                 hdr = strappend("If-None-Match: ", cc);
562                 if (!hdr)
563                         return -ENOMEM;
564
565                 if (!j->request_header) {
566                         j->request_header = curl_slist_new(hdr, NULL);
567                         if (!j->request_header)
568                                 return -ENOMEM;
569                 } else {
570                         struct curl_slist *l;
571
572                         l = curl_slist_append(j->request_header, hdr);
573                         if (!l)
574                                 return -ENOMEM;
575
576                         j->request_header = l;
577                 }
578         }
579
580         if (j->request_header) {
581                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
582                         return -EIO;
583         }
584
585         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, pull_job_write_callback) != CURLE_OK)
586                 return -EIO;
587
588         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
589                 return -EIO;
590
591         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, pull_job_header_callback) != CURLE_OK)
592                 return -EIO;
593
594         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
595                 return -EIO;
596
597         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, pull_job_progress_callback) != CURLE_OK)
598                 return -EIO;
599
600         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
601                 return -EIO;
602
603         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
604                 return -EIO;
605
606         r = curl_glue_add(j->glue, j->curl);
607         if (r < 0)
608                 return r;
609
610         j->state = PULL_JOB_ANALYZING;
611
612         return 0;
613 }