chiark / gitweb /
37f8ef76e4d3416408eb8578d8a95bafe53e9e5b
[elogind.git] / src / import / import-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "import-job.h"
26
27 ImportJob* import_job_unref(ImportJob *j) {
28         if (!j)
29                 return NULL;
30
31         curl_glue_remove_and_free(j->glue, j->curl);
32         curl_slist_free_all(j->request_header);
33
34         safe_close(j->disk_fd);
35
36         if (j->compressed == IMPORT_JOB_XZ)
37                 lzma_end(&j->xz);
38         else if (j->compressed == IMPORT_JOB_GZIP)
39                 inflateEnd(&j->gzip);
40
41         free(j->url);
42         free(j->etag);
43         strv_free(j->old_etags);
44         free(j->payload);
45
46         free(j);
47
48         return NULL;
49 }
50
51 DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
52
53 static void import_job_finish(ImportJob *j, int ret) {
54         assert(j);
55
56         if (j->state == IMPORT_JOB_DONE ||
57             j->state == IMPORT_JOB_FAILED)
58                 return;
59
60         if (ret == 0) {
61                 j->state = IMPORT_JOB_DONE;
62                 log_info("Download of %s complete.", j->url);
63         } else {
64                 j->state = IMPORT_JOB_FAILED;
65                 j->error = ret;
66         }
67
68         if (j->on_finished)
69                 j->on_finished(j);
70 }
71
72 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
73         ImportJob *j = NULL;
74         CURLcode code;
75         long status;
76         int r;
77
78         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
79                 return;
80
81         if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
82                 return;
83
84         if (result != CURLE_OK) {
85                 log_error("Transfer failed: %s", curl_easy_strerror(result));
86                 r = -EIO;
87                 goto finish;
88         }
89
90         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
91         if (code != CURLE_OK) {
92                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
93                 r = -EIO;
94                 goto finish;
95         } else if (status == 304) {
96                 log_info("Image already downloaded. Skipping download.");
97                 r = 0;
98                 goto finish;
99         } else if (status >= 300) {
100                 log_error("HTTP request to %s failed with code %li.", j->url, status);
101                 r = -EIO;
102                 goto finish;
103         } else if (status < 200) {
104                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
105                 r = -EIO;
106                 goto finish;
107         }
108
109         if (j->state != IMPORT_JOB_RUNNING) {
110                 log_error("Premature connection termination.");
111                 r = -EIO;
112                 goto finish;
113         }
114
115         if (j->content_length != (uint64_t) -1 &&
116             j->content_length != j->written_compressed) {
117                 log_error("Download truncated.");
118                 r = -EIO;
119                 goto finish;
120         }
121
122         if (j->disk_fd >= 0 && j->allow_sparse) {
123                 /* Make sure the file size is right, in case the file was
124                  * sparse and we just seeked for the last part */
125
126                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
127                         log_error_errno(errno, "Failed to truncate file: %m");
128                         r = -errno;
129                         goto finish;
130                 }
131
132                 if (j->etag)
133                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
134                 if (j->url)
135                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
136
137                 if (j->mtime != 0) {
138                         struct timespec ut[2];
139
140                         timespec_store(&ut[0], j->mtime);
141                         ut[1] = ut[0];
142                         (void) futimens(j->disk_fd, ut);
143
144                         (void) fd_setcrtime(j->disk_fd, j->mtime);
145                 }
146         }
147
148         r = 0;
149
150 finish:
151         import_job_finish(j, r);
152 }
153
154
155 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
156         ssize_t n;
157
158         assert(j);
159         assert(p);
160         assert(sz > 0);
161         assert(j->disk_fd >= 0);
162
163         if (j->written_uncompressed + sz < j->written_uncompressed) {
164                 log_error("File too large, overflow");
165                 return -EOVERFLOW;
166         }
167
168         if (j->written_uncompressed + sz > j->uncompressed_max) {
169                 log_error("File overly large, refusing");
170                 return -EFBIG;
171         }
172
173         if (j->disk_fd >= 0) {
174
175                 if (j->allow_sparse)
176                         n = sparse_write(j->disk_fd, p, sz, 64);
177                 else
178                         n = write(j->disk_fd, p, sz);
179                 if (n < 0) {
180                         log_error_errno(errno, "Failed to write file: %m");
181                         return -errno;
182                 }
183                 if ((size_t) n < sz) {
184                         log_error("Short write");
185                         return -EIO;
186                 }
187         } else {
188
189                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
190                         return log_oom();
191
192                 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
193                 j->payload_size += sz;
194         }
195
196         j->written_uncompressed += sz;
197
198         return 0;
199 }
200
201 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
202         int r;
203
204         assert(j);
205         assert(p);
206         assert(sz > 0);
207         assert(j->disk_fd >= 0);
208
209         if (j->written_compressed + sz < j->written_compressed) {
210                 log_error("File too large, overflow");
211                 return -EOVERFLOW;
212         }
213
214         if (j->written_compressed + sz > j->compressed_max) {
215                 log_error("File overly large, refusing.");
216                 return -EFBIG;
217         }
218
219         if (j->content_length != (uint64_t) -1 &&
220             j->written_compressed + sz > j->content_length) {
221                 log_error("Content length incorrect.");
222                 return -EFBIG;
223         }
224
225         switch (j->compressed) {
226
227         case IMPORT_JOB_UNCOMPRESSED:
228                 r = import_job_write_uncompressed(j, p, sz);
229                 if (r < 0)
230                         return r;
231
232                 break;
233
234         case IMPORT_JOB_XZ:
235                 j->xz.next_in = p;
236                 j->xz.avail_in = sz;
237
238                 while (j->xz.avail_in > 0) {
239                         uint8_t buffer[16 * 1024];
240                         lzma_ret lzr;
241
242                         j->xz.next_out = buffer;
243                         j->xz.avail_out = sizeof(buffer);
244
245                         lzr = lzma_code(&j->xz, LZMA_RUN);
246                         if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
247                                 log_error("Decompression error.");
248                                 return -EIO;
249                         }
250
251                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
252                         if (r < 0)
253                                 return r;
254                 }
255
256                 break;
257
258         case IMPORT_JOB_GZIP:
259                 j->gzip.next_in = p;
260                 j->gzip.avail_in = sz;
261
262                 while (j->gzip.avail_in > 0) {
263                         uint8_t buffer[16 * 1024];
264
265                         j->gzip.next_out = buffer;
266                         j->gzip.avail_out = sizeof(buffer);
267
268                         r = inflate(&j->gzip, Z_NO_FLUSH);
269                         if (r != Z_OK && r != Z_STREAM_END) {
270                                 log_error("Decompression error.");
271                                 return -EIO;
272                         }
273
274                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
275                         if (r < 0)
276                                 return r;
277                 }
278
279                 break;
280
281         default:
282                 assert_not_reached("Unknown compression");
283         }
284
285         j->written_compressed += sz;
286
287         return 0;
288 }
289
290 static int import_job_open_disk(ImportJob *j) {
291         int r;
292
293         assert(j);
294
295         if (j->on_open_disk) {
296                 r = j->on_open_disk(j);
297                 if (r < 0)
298                         return r;
299         }
300
301         if (j->disk_fd >= 0) {
302                 /* Check if we can do sparse files */
303
304                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
305                         j->allow_sparse = true;
306                 else {
307                         if (errno != ESPIPE)
308                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
309
310                         j->allow_sparse = false;
311                 }
312         }
313
314         return 0;
315 }
316
317 static int import_job_detect_compression(ImportJob *j) {
318         static const uint8_t xz_signature[] = {
319                 0xfd, '7', 'z', 'X', 'Z', 0x00
320         };
321         static const uint8_t gzip_signature[] = {
322                 0x1f, 0x8b
323         };
324
325         _cleanup_free_ uint8_t *stub = NULL;
326         size_t stub_size;
327
328         int r;
329
330         assert(j);
331
332         if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
333                 return 0;
334
335         if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
336                 j->compressed = IMPORT_JOB_XZ;
337         else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
338                 j->compressed = IMPORT_JOB_GZIP;
339         else
340                 j->compressed = IMPORT_JOB_UNCOMPRESSED;
341
342         log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
343         log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
344
345         if (j->compressed == IMPORT_JOB_XZ) {
346                 lzma_ret xzr;
347
348                 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
349                 if (xzr != LZMA_OK) {
350                         log_error("Failed to initialize XZ decoder.");
351                         return -EIO;
352                 }
353         }
354         if (j->compressed == IMPORT_JOB_GZIP) {
355                 r = inflateInit2(&j->gzip, 15+16);
356                 if (r != Z_OK) {
357                         log_error("Failed to initialize gzip decoder.");
358                         return -EIO;
359                 }
360         }
361
362         r = import_job_open_disk(j);
363         if (r < 0)
364                 return r;
365
366         /* Now, take the payload we read so far, and decompress it */
367         stub = j->payload;
368         stub_size = j->payload_size;
369
370         j->payload = NULL;
371         j->payload_size = 0;
372         j->payload_allocated = 0;
373
374         j->state = IMPORT_JOB_RUNNING;
375
376         r = import_job_write_compressed(j, stub, stub_size);
377         if (r < 0)
378                 return r;
379
380         return 0;
381 }
382
383 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
384         ImportJob *j = userdata;
385         size_t sz = size * nmemb;
386         int r;
387
388         assert(contents);
389         assert(j);
390
391         switch (j->state) {
392
393         case IMPORT_JOB_ANALYZING:
394                 /* Let's first check what it actually is */
395
396                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
397                         r = log_oom();
398                         goto fail;
399                 }
400
401                 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
402                 j->payload_size += sz;
403
404                 r = import_job_detect_compression(j);
405                 if (r < 0)
406                         goto fail;
407
408                 break;
409
410         case IMPORT_JOB_RUNNING:
411
412                 r = import_job_write_compressed(j, contents, sz);
413                 if (r < 0)
414                         goto fail;
415
416                 break;
417
418         case IMPORT_JOB_DONE:
419         case IMPORT_JOB_FAILED:
420                 r = -ESTALE;
421                 goto fail;
422
423         default:
424                 assert_not_reached("Impossible state.");
425         }
426
427         return sz;
428
429 fail:
430         import_job_finish(j, r);
431         return 0;
432 }
433
434 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
435         ImportJob *j = userdata;
436         size_t sz = size * nmemb;
437         _cleanup_free_ char *length = NULL, *last_modified = NULL;
438         char *etag;
439         int r;
440
441         assert(contents);
442         assert(j);
443
444         if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
445                 r = -ESTALE;
446                 goto fail;
447         }
448
449         assert(j->state == IMPORT_JOB_ANALYZING);
450
451         r = curl_header_strdup(contents, sz, "ETag:", &etag);
452         if (r < 0) {
453                 log_oom();
454                 goto fail;
455         }
456         if (r > 0) {
457                 free(j->etag);
458                 j->etag = etag;
459
460                 if (strv_contains(j->old_etags, j->etag)) {
461                         log_info("Image already downloaded. Skipping download.");
462                         import_job_finish(j, 0);
463                         return sz;
464                 }
465
466                 return sz;
467         }
468
469         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
470         if (r < 0) {
471                 log_oom();
472                 goto fail;
473         }
474         if (r > 0) {
475                 (void) safe_atou64(length, &j->content_length);
476
477                 if (j->content_length != (uint64_t) -1) {
478                         char bytes[FORMAT_BYTES_MAX];
479
480                         if (j->content_length > j->compressed_max) {
481                                 log_error("Content too large.");
482                                 r = -EFBIG;
483                                 goto fail;
484                         }
485
486                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
487                 }
488
489                 return sz;
490         }
491
492         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
493         if (r < 0) {
494                 log_oom();
495                 goto fail;
496         }
497         if (r > 0) {
498                 (void) curl_parse_http_time(last_modified, &j->mtime);
499                 return sz;
500         }
501
502         return sz;
503
504 fail:
505         import_job_finish(j, r);
506         return 0;
507 }
508
509 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
510         ImportJob *j = userdata;
511         unsigned percent;
512         usec_t n;
513
514         assert(j);
515
516         if (dltotal <= 0)
517                 return 0;
518
519         percent = ((100 * dlnow) / dltotal);
520         n = now(CLOCK_MONOTONIC);
521
522         if (n > j->last_status_usec + USEC_PER_SEC &&
523             percent != j->progress_percent &&
524             dlnow < dltotal) {
525                 char buf[FORMAT_TIMESPAN_MAX];
526
527                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
528                         usec_t left, done;
529
530                         done = n - j->start_usec;
531                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
532
533                         log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC));
534                 } else
535                         log_info("Got %u%% of %s.", percent, j->url);
536
537                 j->progress_percent = percent;
538                 j->last_status_usec = n;
539         }
540
541         return 0;
542 }
543
544 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
545         _cleanup_(import_job_unrefp) ImportJob *j = NULL;
546
547         assert(url);
548         assert(glue);
549         assert(ret);
550
551         j = new0(ImportJob, 1);
552         if (!j)
553                 return -ENOMEM;
554
555         j->state = IMPORT_JOB_INIT;
556         j->disk_fd = -1;
557         j->userdata = userdata;
558         j->glue = glue;
559         j->content_length = (uint64_t) -1;
560         j->start_usec = now(CLOCK_MONOTONIC);
561         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
562
563         j->url = strdup(url);
564         if (!j->url)
565                 return -ENOMEM;
566
567         *ret = j;
568         j = NULL;
569
570         return 0;
571 }
572
573 int import_job_begin(ImportJob *j) {
574         int r;
575
576         assert(j);
577
578         if (j->state != IMPORT_JOB_INIT)
579                 return -EBUSY;
580
581         r = curl_glue_make(&j->curl, j->url, j);
582         if (r < 0)
583                 return r;
584
585         if (!strv_isempty(j->old_etags)) {
586                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
587
588                 cc = strv_join(j->old_etags, ", ");
589                 if (!cc)
590                         return -ENOMEM;
591
592                 hdr = strappend("If-None-Match: ", cc);
593                 if (!hdr)
594                         return -ENOMEM;
595
596                 j->request_header = curl_slist_new(hdr, NULL);
597                 if (!j->request_header)
598                         return -ENOMEM;
599
600                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
601                         return -EIO;
602         }
603
604         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
605                 return -EIO;
606
607         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
608                 return -EIO;
609
610         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
611                 return -EIO;
612
613         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
614                 return -EIO;
615
616         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
617                 return -EIO;
618
619         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
620                 return -EIO;
621
622         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
623                 return -EIO;
624
625         r = curl_glue_add(j->glue, j->curl);
626         if (r < 0)
627                 return r;
628
629         j->state = IMPORT_JOB_ANALYZING;
630
631         return 0;
632 }