chiark / gitweb /
import: improve logging a bit
[elogind.git] / src / import / import-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2015 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "import-job.h"
26
27 ImportJob* import_job_unref(ImportJob *j) {
28         if (!j)
29                 return NULL;
30
31         curl_glue_remove_and_free(j->glue, j->curl);
32         curl_slist_free_all(j->request_header);
33
34         safe_close(j->disk_fd);
35
36         if (j->compressed == IMPORT_JOB_XZ)
37                 lzma_end(&j->xz);
38         else if (j->compressed == IMPORT_JOB_GZIP)
39                 inflateEnd(&j->gzip);
40
41         free(j->url);
42         free(j->etag);
43         strv_free(j->old_etags);
44         free(j->payload);
45
46         free(j);
47
48         return NULL;
49 }
50
51 DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
52
53 static void import_job_finish(ImportJob *j, int ret) {
54         assert(j);
55
56         if (j->state == IMPORT_JOB_DONE ||
57             j->state == IMPORT_JOB_FAILED)
58                 return;
59
60         if (ret == 0) {
61                 j->state = IMPORT_JOB_DONE;
62                 log_info("Download of %s complete.", j->url);
63         } else {
64                 j->state = IMPORT_JOB_FAILED;
65                 j->error = ret;
66         }
67
68         if (j->on_finished)
69                 j->on_finished(j);
70 }
71
72 void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
73         ImportJob *j = NULL;
74         CURLcode code;
75         long status;
76         int r;
77
78         if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
79                 return;
80
81         if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
82                 return;
83
84         if (result != CURLE_OK) {
85                 log_error("Transfer failed: %s", curl_easy_strerror(result));
86                 r = -EIO;
87                 goto finish;
88         }
89
90         code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
91         if (code != CURLE_OK) {
92                 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
93                 r = -EIO;
94                 goto finish;
95         } else if (status == 304) {
96                 log_info("Image already downloaded. Skipping download.");
97                 r = 0;
98                 goto finish;
99         } else if (status >= 300) {
100                 log_error("HTTP request to %s failed with code %li.", j->url, status);
101                 r = -EIO;
102                 goto finish;
103         } else if (status < 200) {
104                 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
105                 r = -EIO;
106                 goto finish;
107         }
108
109         if (j->state != IMPORT_JOB_RUNNING) {
110                 log_error("Premature connection termination.");
111                 r = -EIO;
112                 goto finish;
113         }
114
115         if (j->content_length != (uint64_t) -1 &&
116             j->content_length != j->written_compressed) {
117                 log_error("Download truncated.");
118                 r = -EIO;
119                 goto finish;
120         }
121
122         if (j->disk_fd >= 0 && j->allow_sparse) {
123                 /* Make sure the file size is right, in case the file was
124                  * sparse and we just seeked for the last part */
125
126                 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
127                         log_error_errno(errno, "Failed to truncate file: %m");
128                         r = -errno;
129                         goto finish;
130                 }
131
132                 if (j->etag)
133                         (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
134                 if (j->url)
135                         (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
136
137                 if (j->mtime != 0) {
138                         struct timespec ut[2];
139
140                         timespec_store(&ut[0], j->mtime);
141                         ut[1] = ut[0];
142                         (void) futimens(j->disk_fd, ut);
143
144                         (void) fd_setcrtime(j->disk_fd, j->mtime);
145                 }
146         }
147
148         r = 0;
149
150 finish:
151         import_job_finish(j, r);
152 }
153
154
155 static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
156         ssize_t n;
157
158         assert(j);
159         assert(p);
160         assert(sz > 0);
161         assert(j->disk_fd >= 0);
162
163         if (j->written_uncompressed + sz < j->written_uncompressed) {
164                 log_error("File too large, overflow");
165                 return -EOVERFLOW;
166         }
167
168         if (j->written_uncompressed + sz > j->uncompressed_max) {
169                 log_error("File overly large, refusing");
170                 return -EFBIG;
171         }
172
173         if (j->disk_fd >= 0) {
174
175                 if (j->allow_sparse)
176                         n = sparse_write(j->disk_fd, p, sz, 64);
177                 else
178                         n = write(j->disk_fd, p, sz);
179                 if (n < 0) {
180                         log_error_errno(errno, "Failed to write file: %m");
181                         return -errno;
182                 }
183                 if ((size_t) n < sz) {
184                         log_error("Short write");
185                         return -EIO;
186                 }
187         } else {
188
189                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
190                         return log_oom();
191
192                 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
193                 j->payload_size += sz;
194         }
195
196         j->written_uncompressed += sz;
197
198         return 0;
199 }
200
201 static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
202         int r;
203
204         assert(j);
205         assert(p);
206         assert(sz > 0);
207         assert(j->disk_fd >= 0);
208
209         if (j->written_compressed + sz < j->written_compressed) {
210                 log_error("File too large, overflow");
211                 return -EOVERFLOW;
212         }
213
214         if (j->written_compressed + sz > j->compressed_max) {
215                 log_error("File overly large, refusing.");
216                 return -EFBIG;
217         }
218
219         if (j->content_length != (uint64_t) -1 &&
220             j->written_compressed + sz > j->content_length) {
221                 log_error("Content length incorrect.");
222                 return -EFBIG;
223         }
224
225         switch (j->compressed) {
226
227         case IMPORT_JOB_UNCOMPRESSED:
228                 r = import_job_write_uncompressed(j, p, sz);
229                 if (r < 0)
230                         return r;
231
232                 break;
233
234         case IMPORT_JOB_XZ:
235                 j->xz.next_in = p;
236                 j->xz.avail_in = sz;
237
238                 while (j->xz.avail_in > 0) {
239                         uint8_t buffer[16 * 1024];
240                         lzma_ret lzr;
241
242                         j->xz.next_out = buffer;
243                         j->xz.avail_out = sizeof(buffer);
244
245                         lzr = lzma_code(&j->xz, LZMA_RUN);
246                         if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
247                                 log_error("Decompression error.");
248                                 return -EIO;
249                         }
250
251                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
252                         if (r < 0)
253                                 return r;
254                 }
255
256                 break;
257
258         case IMPORT_JOB_GZIP:
259                 j->gzip.next_in = p;
260                 j->gzip.avail_in = sz;
261
262                 while (j->gzip.avail_in > 0) {
263                         uint8_t buffer[16 * 1024];
264
265                         j->gzip.next_out = buffer;
266                         j->gzip.avail_out = sizeof(buffer);
267
268                         r = inflate(&j->gzip, Z_NO_FLUSH);
269                         if (r != Z_OK && r != Z_STREAM_END) {
270                                 log_error("Decompression error.");
271                                 return -EIO;
272                         }
273
274                         r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
275                         if (r < 0)
276                                 return r;
277                 }
278
279                 break;
280
281         default:
282                 assert_not_reached("Unknown compression");
283         }
284
285         j->written_compressed += sz;
286
287         return 0;
288 }
289
290 static int import_job_open_disk(ImportJob *j) {
291         int r;
292
293         assert(j);
294
295         if (j->on_open_disk) {
296                 r = j->on_open_disk(j);
297                 if (r < 0)
298                         return r;
299         }
300
301         if (j->disk_fd >= 0) {
302                 /* Check if we can do sparse files */
303
304                 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
305                         j->allow_sparse = true;
306                 else {
307                         if (errno != ESPIPE)
308                                 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
309
310                         j->allow_sparse = false;
311                 }
312         }
313
314         return 0;
315 }
316
317 static int import_job_detect_compression(ImportJob *j) {
318         static const uint8_t xz_signature[] = {
319                 0xfd, '7', 'z', 'X', 'Z', 0x00
320         };
321         static const uint8_t gzip_signature[] = {
322                 0x1f, 0x8b
323         };
324
325         _cleanup_free_ uint8_t *stub = NULL;
326         size_t stub_size;
327
328         int r;
329
330         assert(j);
331
332         if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
333                 return 0;
334
335         if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
336                 j->compressed = IMPORT_JOB_XZ;
337         else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
338                 j->compressed = IMPORT_JOB_GZIP;
339         else
340                 j->compressed = IMPORT_JOB_UNCOMPRESSED;
341
342         log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
343         log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
344
345         if (j->compressed == IMPORT_JOB_XZ) {
346                 lzma_ret xzr;
347
348                 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
349                 if (xzr != LZMA_OK) {
350                         log_error("Failed to initialize XZ decoder.");
351                         return -EIO;
352                 }
353         }
354         if (j->compressed == IMPORT_JOB_GZIP) {
355                 r = inflateInit2(&j->gzip, 15+16);
356                 if (r != Z_OK) {
357                         log_error("Failed to initialize gzip decoder.");
358                         return -EIO;
359                 }
360         }
361
362         r = import_job_open_disk(j);
363         if (r < 0)
364                 return r;
365
366         /* Now, take the payload we read so far, and decompress it */
367         stub = j->payload;
368         stub_size = j->payload_size;
369
370         j->payload = NULL;
371         j->payload_size = 0;
372
373         j->state = IMPORT_JOB_RUNNING;
374
375         r = import_job_write_compressed(j, stub, stub_size);
376         if (r < 0)
377                 return r;
378
379         return 0;
380 }
381
382 static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
383         ImportJob *j = userdata;
384         size_t sz = size * nmemb;
385         int r;
386
387         assert(contents);
388         assert(j);
389
390         switch (j->state) {
391
392         case IMPORT_JOB_ANALYZING:
393                 /* Let's first check what it actually is */
394
395                 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
396                         r = log_oom();
397                         goto fail;
398                 }
399
400                 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
401                 j->payload_size += sz;
402
403                 r = import_job_detect_compression(j);
404                 if (r < 0)
405                         goto fail;
406
407                 break;
408
409         case IMPORT_JOB_RUNNING:
410
411                 r = import_job_write_compressed(j, contents, sz);
412                 if (r < 0)
413                         goto fail;
414
415                 break;
416
417         case IMPORT_JOB_DONE:
418         case IMPORT_JOB_FAILED:
419                 r = -ESTALE;
420                 goto fail;
421
422         default:
423                 assert_not_reached("Impossible state.");
424         }
425
426         return sz;
427
428 fail:
429         import_job_finish(j, r);
430         return 0;
431 }
432
433 static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
434         ImportJob *j = userdata;
435         size_t sz = size * nmemb;
436         _cleanup_free_ char *length = NULL, *last_modified = NULL;
437         char *etag;
438         int r;
439
440         assert(contents);
441         assert(j);
442
443         if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
444                 r = -ESTALE;
445                 goto fail;
446         }
447
448         assert(j->state == IMPORT_JOB_ANALYZING);
449
450         r = curl_header_strdup(contents, sz, "ETag:", &etag);
451         if (r < 0) {
452                 log_oom();
453                 goto fail;
454         }
455         if (r > 0) {
456                 free(j->etag);
457                 j->etag = etag;
458
459                 if (strv_contains(j->old_etags, j->etag)) {
460                         log_info("Image already downloaded. Skipping download.");
461                         import_job_finish(j, 0);
462                         return sz;
463                 }
464
465                 return sz;
466         }
467
468         r = curl_header_strdup(contents, sz, "Content-Length:", &length);
469         if (r < 0) {
470                 log_oom();
471                 goto fail;
472         }
473         if (r > 0) {
474                 (void) safe_atou64(length, &j->content_length);
475
476                 if (j->content_length != (uint64_t) -1) {
477                         char bytes[FORMAT_BYTES_MAX];
478
479                         if (j->content_length > j->compressed_max) {
480                                 log_error("Content too large.");
481                                 r = -EFBIG;
482                                 goto fail;
483                         }
484
485                         log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
486                 }
487
488                 return sz;
489         }
490
491         r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
492         if (r < 0) {
493                 log_oom();
494                 goto fail;
495         }
496         if (r > 0) {
497                 (void) curl_parse_http_time(last_modified, &j->mtime);
498                 return sz;
499         }
500
501         return sz;
502
503 fail:
504         import_job_finish(j, r);
505         return 0;
506 }
507
508 static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
509         ImportJob *j = userdata;
510         unsigned percent;
511         usec_t n;
512
513         assert(j);
514
515         if (dltotal <= 0)
516                 return 0;
517
518         percent = ((100 * dlnow) / dltotal);
519         n = now(CLOCK_MONOTONIC);
520
521         if (n > j->last_status_usec + USEC_PER_SEC &&
522             percent != j->progress_percent &&
523             dlnow < dltotal) {
524                 char buf[FORMAT_TIMESPAN_MAX];
525
526                 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
527                         usec_t left, done;
528
529                         done = n - j->start_usec;
530                         left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
531
532                         log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC));
533                 } else
534                         log_info("Got %u%% of %s.", percent, j->url);
535
536                 j->progress_percent = percent;
537                 j->last_status_usec = n;
538         }
539
540         return 0;
541 }
542
543 int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
544         _cleanup_(import_job_unrefp) ImportJob *j = NULL;
545
546         assert(url);
547         assert(glue);
548         assert(ret);
549
550         j = new0(ImportJob, 1);
551         if (!j)
552                 return -ENOMEM;
553
554         j->state = IMPORT_JOB_INIT;
555         j->disk_fd = -1;
556         j->userdata = userdata;
557         j->glue = glue;
558         j->content_length = (uint64_t) -1;
559         j->start_usec = now(CLOCK_MONOTONIC);
560         j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
561
562         j->url = strdup(url);
563         if (!j->url)
564                 return -ENOMEM;
565
566         *ret = j;
567         j = NULL;
568
569         return 0;
570 }
571
572 int import_job_begin(ImportJob *j) {
573         int r;
574
575         assert(j);
576
577         if (j->state != IMPORT_JOB_INIT)
578                 return -EBUSY;
579
580         r = curl_glue_make(&j->curl, j->url, j);
581         if (r < 0)
582                 return r;
583
584         if (!strv_isempty(j->old_etags)) {
585                 _cleanup_free_ char *cc = NULL, *hdr = NULL;
586
587                 cc = strv_join(j->old_etags, ", ");
588                 if (!cc)
589                         return -ENOMEM;
590
591                 hdr = strappend("If-None-Match: ", cc);
592                 if (!hdr)
593                         return -ENOMEM;
594
595                 j->request_header = curl_slist_new(hdr, NULL);
596                 if (!j->request_header)
597                         return -ENOMEM;
598
599                 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
600                         return -EIO;
601         }
602
603         if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
604                 return -EIO;
605
606         if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
607                 return -EIO;
608
609         if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
610                 return -EIO;
611
612         if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
613                 return -EIO;
614
615         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
616                 return -EIO;
617
618         if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
619                 return -EIO;
620
621         if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
622                 return -EIO;
623
624         r = curl_glue_add(j->glue, j->curl);
625         if (r < 0)
626                 return r;
627
628         j->state = IMPORT_JOB_ANALYZING;
629
630         return 0;
631 }