chiark / gitweb /
import: simplify dkr importer, by making use of generic import-job logic, used by...
[elogind.git] / src / import / import-dkr.c
index 8dfd2707eed539681bcfaf57bf4da52204aa6a73..cebec28144f4851278baa5c1082af636847f588a 100644 (file)
 #include <curl/curl.h>
 #include <sys/prctl.h>
 
-#include "hashmap.h"
 #include "set.h"
 #include "json.h"
 #include "strv.h"
-#include "curl-util.h"
-#include "import-dkr.h"
 #include "btrfs-util.h"
-#include "aufs-util.h"
 #include "utf8.h"
+#include "mkdir.h"
+#include "curl-util.h"
+#include "aufs-util.h"
+#include "import-util.h"
+#include "import-job.h"
+#include "import-dkr.h"
 
-/* TODO:
-  - convert json bits
-  - man page
-  - fall back to btrfs loop pool device
-*/
-
-typedef struct DkrImportJob DkrImportJob;
-typedef struct DkrImportName DkrImportName;
-
-typedef enum DkrImportJobType {
-        DKR_IMPORT_JOB_IMAGES,
-        DKR_IMPORT_JOB_TAGS,
-        DKR_IMPORT_JOB_ANCESTRY,
-        DKR_IMPORT_JOB_JSON,
-        DKR_IMPORT_JOB_LAYER,
-} DkrImportJobType;
-
-struct DkrImportJob {
-        DkrImport *import;
-        DkrImportJobType type;
-        bool done;
-
-        char *url;
-
-        Set *needed_by; /* DkrImport Name objects */
-
-        CURL *curl;
-        struct curl_slist *request_header;
-        void *payload;
-        size_t payload_size;
-
-        char *response_token;
-        char **response_registries;
-
-        char *temp_path;
-        char *final_path;
+struct DkrImport {
+        sd_event *event;
+        CurlGlue *glue;
 
-        pid_t tar_pid;
-        FILE *tar_stream;
-};
+        char *index_url;
+        char *image_root;
 
-struct DkrImportName {
-        DkrImport *import;
+        ImportJob *images_job;
+        ImportJob *tags_job;
+        ImportJob *ancestry_job;
+        ImportJob *json_job;
+        ImportJob *layer_job;
 
         char *name;
         char *tag;
         char *id;
-        char *local;
 
-        DkrImportJob *job_images, *job_tags, *job_ancestry, *job_json, *job_layer;
+        char *response_token;
+        char **response_registries;
 
         char **ancestry;
         unsigned current_ancestry;
 
-        bool force_local;
-};
-
-struct DkrImport {
-        sd_event *event;
-        CurlGlue *glue;
-
-        char *index_url;
-        char *image_root;
+        DkrImportFinished on_finished;
+        void *userdata;
 
-        Hashmap *names;
-        Hashmap *jobs;
+        char *local;
+        bool force_local;
 
-        dkr_import_on_finished on_finished;
-        void *userdata;
+        char *temp_path;
+        char *final_path;
 
-        bool finished;
+        pid_t tar_pid;
 };
 
 #define PROTOCOL_PREFIX "https://"
@@ -110,89 +74,105 @@ struct DkrImport {
 #define HEADER_TOKEN "X-Do" /* the HTTP header for the auth token */ "cker-Token:"
 #define HEADER_REGISTRY "X-Do" /*the HTTP header for the registry */ "cker-Endpoints:"
 
-#define PAYLOAD_MAX (16*1024*1024)
 #define LAYERS_MAX 2048
 
-static int dkr_import_name_add_job(DkrImportName *name, DkrImportJobType type, const char *url, DkrImportJob **ret);
+static void dkr_import_job_on_finished(ImportJob *j);
 
-static DkrImportJob *dkr_import_job_unref(DkrImportJob *job) {
-        if (!job)
+DkrImport* dkr_import_unref(DkrImport *i) {
+        if (!i)
                 return NULL;
 
-        if (job->import)
-                curl_glue_remove_and_free(job->import->glue, job->curl);
-        curl_slist_free_all(job->request_header);
-
-        if (job->tar_stream)
-                fclose(job->tar_stream);
-
-        free(job->final_path);
-
-        if (job->temp_path) {
-                btrfs_subvol_remove(job->temp_path);
-                free(job->temp_path);
+        if (i->tar_pid > 1) {
+                (void) kill_and_sigcont(i->tar_pid, SIGKILL);
+                (void) wait_for_terminate(i->tar_pid, NULL);
         }
 
-        set_free(job->needed_by);
+        import_job_unref(i->images_job);
+        import_job_unref(i->tags_job);
+        import_job_unref(i->ancestry_job);
+        import_job_unref(i->json_job);
+        import_job_unref(i->layer_job);
 
-        if (job->tar_pid > 0)
-                kill(job->tar_pid, SIGTERM);
+        curl_glue_unref(i->glue);
+        sd_event_unref(i->event);
 
-        free(job->url);
-        free(job->payload);
-        free(job->response_token);
-        strv_free(job->response_registries);
+        if (i->temp_path) {
+                (void) btrfs_subvol_remove(i->temp_path);
+                (void) rm_rf_dangerous(i->temp_path, false, true, false);
+                free(i->temp_path);
+        }
 
-        free(job);
+        free(i->name);
+        free(i->tag);
+        free(i->id);
+        free(i->response_token);
+        free(i->response_registries);
+        strv_free(i->ancestry);
+        free(i->final_path);
+        free(i->index_url);
+        free(i->image_root);
+        free(i->local);
+        free(i);
 
         return NULL;
 }
 
-static DkrImportName *dkr_import_name_unref(DkrImportName *name) {
-        if (!name)
-                return NULL;
+int dkr_import_new(
+                DkrImport **ret,
+                sd_event *event,
+                const char *index_url,
+                const char *image_root,
+                DkrImportFinished on_finished,
+                void *userdata) {
 
-        if (name->job_images)
-                set_remove(name->job_images->needed_by, name);
+        _cleanup_(dkr_import_unrefp) DkrImport *i = NULL;
+        char *e;
+        int r;
 
-        if (name->job_tags)
-                set_remove(name->job_tags->needed_by, name);
+        assert(ret);
+        assert(index_url);
 
-        if (name->job_ancestry)
-                set_remove(name->job_ancestry->needed_by, name);
+        if (!http_url_is_valid(index_url))
+                return -EINVAL;
 
-        if (name->job_json)
-                set_remove(name->job_json->needed_by, name);
+        i = new0(DkrImport, 1);
+        if (!i)
+                return -ENOMEM;
 
-        if (name->job_layer)
-                set_remove(name->job_layer->needed_by, name);
+        i->on_finished = on_finished;
+        i->userdata = userdata;
 
-        free(name->name);
-        free(name->id);
-        free(name->tag);
-        free(name->local);
+        i->image_root = strdup(image_root ?: "/var/lib/machines");
+        if (!i->image_root)
+                return -ENOMEM;
 
-        strv_free(name->ancestry);
-        free(name);
+        i->index_url = strdup(index_url);
+        if (!i->index_url)
+                return -ENOMEM;
 
-        return NULL;
-}
+        e = endswith(i->index_url, "/");
+        if (e)
+                *e = 0;
 
-DEFINE_TRIVIAL_CLEANUP_FUNC(DkrImportJob*, dkr_import_job_unref);
-DEFINE_TRIVIAL_CLEANUP_FUNC(DkrImportName*, dkr_import_name_unref);
+        if (event)
+                i->event = sd_event_ref(event);
+        else {
+                r = sd_event_default(&i->event);
+                if (r < 0)
+                        return r;
+        }
 
-static void dkr_import_finish(DkrImport *import, int error) {
-        assert(import);
+        r = curl_glue_new(&i->glue, i->event);
+        if (r < 0)
+                return r;
 
-        if (import->finished)
-                return;
+        i->glue->on_finished = import_job_curl_on_finished;
+        i->glue->userdata = i;
 
-        import->finished = true;
+        *ret = i;
+        i = NULL;
 
-        if (import->on_finished)
-                import->on_finished(import, error, import->userdata);
-        else
-                sd_event_exit(import->event, error);
+        return 0;
 }
 
 static int parse_id(const void *payload, size_t size, char **ret) {
@@ -334,135 +314,134 @@ static int parse_ancestry(const void *payload, size_t size, char ***ret) {
         }
 }
 
-static const char *dkr_import_name_current_layer(DkrImportName *name) {
-        assert(name);
+static const char *dkr_import_current_layer(DkrImport *i) {
+        assert(i);
 
-        if (strv_isempty(name->ancestry))
+        if (strv_isempty(i->ancestry))
                 return NULL;
 
-        return name->ancestry[name->current_ancestry];
+        return i->ancestry[i->current_ancestry];
 }
 
-static const char *dkr_import_name_current_base_layer(DkrImportName *name) {
-        assert(name);
+static const char *dkr_import_current_base_layer(DkrImport *i) {
+        assert(i);
 
-        if (strv_isempty(name->ancestry))
+        if (strv_isempty(i->ancestry))
                 return NULL;
 
-        if (name->current_ancestry <= 0)
+        if (i->current_ancestry <= 0)
                 return NULL;
 
-        return name->ancestry[name->current_ancestry-1];
+        return i->ancestry[i->current_ancestry-1];
 }
 
-static char** dkr_import_name_get_registries(DkrImportName *name) {
-        assert(name);
-
-        if (!name->job_images)
-                return NULL;
-
-        if (!name->job_images->done)
-                return NULL;
-
-        if (strv_isempty(name->job_images->response_registries))
-                return NULL;
-
-        return name->job_images->response_registries;
-}
+static int dkr_import_add_token(DkrImport *i, ImportJob *j) {
+        const char *t;
 
-static const char*dkr_import_name_get_token(DkrImportName *name) {
-        assert(name);
+        assert(i);
+        assert(j);
 
-        if (!name->job_images)
-                return NULL;
+        if (i->response_token)
+                t = strappenda("Authorization: Token ", i->response_token);
+        else
+                t = HEADER_TOKEN " true";
 
-        if (!name->job_images->done)
-                return NULL;
+        j->request_header = curl_slist_new("Accept: application/json", t, NULL);
+        if (!j->request_header)
+                return -ENOMEM;
 
-        return name->job_images->response_token;
+        return 0;
 }
 
-static void dkr_import_name_maybe_finish(DkrImportName *name) {
-        int r;
-
-        assert(name);
+static bool dkr_import_is_done(DkrImport *i) {
+        assert(i);
+        assert(i->images_job);
 
-        if (!name->job_images || !name->job_images->done)
-                return;
+        if (i->images_job->state != IMPORT_JOB_DONE)
+                return false;
 
-        if (!name->job_ancestry || !name->job_ancestry->done)
-                return;
+        if (!i->tags_job || i->tags_job->state != IMPORT_JOB_DONE)
+                return false;
 
-        if (!name->job_json || !name->job_json->done)
-                return;
+        if (!i->ancestry_job || i->ancestry_job->state != IMPORT_JOB_DONE)
+                return false;
 
-        if (name->job_layer && !name->job_json->done)
-                return;
+        if (!i->json_job || i->json_job->state != IMPORT_JOB_DONE)
+                return false;
 
-        if (dkr_import_name_current_layer(name))
-                return;
+        if (i->layer_job && i->layer_job->state != IMPORT_JOB_DONE)
+                return false;
 
-        if (name->local) {
-                const char *p, *q;
+        if (dkr_import_current_layer(i))
+                return false;
 
-                assert(name->id);
+        return true;
+}
 
-                p = strappenda(name->import->image_root, "/", name->local);
-                q = strappenda(name->import->image_root, "/.dkr-", name->id);
+static int dkr_import_make_local_copy(DkrImport *i) {
+        int r;
 
-                if (name->force_local) {
-                        (void) btrfs_subvol_remove(p);
-                        (void) rm_rf_dangerous(p, false, true, false);
-                }
+        assert(i);
 
-                r = btrfs_subvol_snapshot(q, p, false, false);
-                if (r < 0) {
-                        log_error_errno(r, "Failed to snapshot local image: %m");
-                        dkr_import_finish(name->import, r);
-                        return;
-                }
+        if (!i->local)
+                return 0;
 
-                log_info("Created new local image '%s'.", name->local);
+        if (!i->final_path) {
+                i->final_path = strjoin(i->image_root, "/.dkr-", i->id, NULL);
+                if (!i->final_path)
+                        return log_oom();
         }
 
-        dkr_import_finish(name->import, 0);
+        r = import_make_local_copy(i->final_path, i->image_root, i->local, i->force_local);
+        if (r < 0)
+                return r;
+
+        return 0;
 }
 
-static int dkr_import_job_run_tar(DkrImportJob *job) {
+static int dkr_import_job_on_open_disk(ImportJob *j) {
         _cleanup_close_pair_ int pipefd[2] = { -1, -1 };
-        bool gzip;
-
-        assert(job);
+        const char *base;
+        DkrImport *i;
+        int r;
 
-        /* A stream to run tar on? */
-        if (!job->temp_path)
-                return 0;
+        assert(j);
+        assert(j->userdata);
 
-        if (job->tar_stream)
-                return 0;
+        i = j->userdata;
+        assert(i->layer_job == j);
+        assert(i->final_path);
+        assert(!i->temp_path);
+        assert(i->tar_pid <= 0);
 
-        /* Maybe fork off tar, if we have enough to figure out that
-         * something is gzip compressed or not */
+        r = tempfn_random(i->final_path, &i->temp_path);
+        if (r < 0)
+                return log_oom();
 
-        if (job->payload_size < 2)
-                return 0;
+        mkdir_parents_label(i->temp_path, 0700);
 
-        /* Detect gzip signature */
-        gzip = ((uint8_t*) job->payload)[0] == 0x1f &&
-               ((uint8_t*) job->payload)[1] == 0x8b;
+        base = dkr_import_current_base_layer(i);
+        if (base) {
+                const char *base_path;
 
-        assert(!job->tar_stream);
-        assert(job->tar_pid <= 0);
+                base_path = strappenda(i->image_root, "/.dkr-", base);
+                r = btrfs_subvol_snapshot(base_path, i->temp_path, false, true);
+        } else
+                r = btrfs_subvol_make(i->temp_path);
+        if (r < 0)
+                return log_error_errno(r, "Failed to make btrfs subvolume %s: %m", i->temp_path);
 
         if (pipe2(pipefd, O_CLOEXEC) < 0)
                 return log_error_errno(errno, "Failed to create pipe for tar: %m");
 
-        job->tar_pid = fork();
-        if (job->tar_pid < 0)
+        i->tar_pid = fork();
+        if (i->tar_pid < 0)
                 return log_error_errno(errno, "Failed to fork off tar: %m");
-        if (job->tar_pid == 0) {
+        if (i->tar_pid == 0) {
                 int null_fd;
 
+                /* Child */
+
                 reset_all_signal_handlers();
                 reset_signal_mask();
                 assert_se(prctl(PR_SET_PDEATHSIG, SIGTERM) == 0);
@@ -476,8 +455,6 @@ static int dkr_import_job_run_tar(DkrImportJob *job) {
 
                 if (pipefd[0] != STDIN_FILENO)
                         safe_close(pipefd[0]);
-                if (pipefd[1] != STDIN_FILENO)
-                        safe_close(pipefd[1]);
 
                 null_fd = open("/dev/null", O_WRONLY|O_NOCTTY);
                 if (null_fd < 0) {
@@ -493,49 +470,35 @@ static int dkr_import_job_run_tar(DkrImportJob *job) {
                 if (null_fd != STDOUT_FILENO)
                         safe_close(null_fd);
 
-                execlp("tar", "tar", "-C", job->temp_path, gzip ? "-xpz" : "-px", "--numeric-owner", NULL);
+                execlp("tar", "tar", "--numeric-owner", "-C", i->temp_path, "-px", NULL);
+                log_error_errno(errno, "Failed to execute tar: %m");
                 _exit(EXIT_FAILURE);
         }
 
         pipefd[0] = safe_close(pipefd[0]);
 
-        job->tar_stream = fdopen(pipefd[1], "w");
-        if (!job->tar_stream)
-                return log_error_errno(errno, "Failed to allocate tar stream: %m");
-
+        j->disk_fd = pipefd[1];
         pipefd[1] = -1;
 
-        if (fwrite(job->payload, 1, job->payload_size, job->tar_stream) != job->payload_size)
-                return log_error_errno(errno, "Couldn't write payload: %m");
-
-        free(job->payload);
-        job->payload = NULL;
-        job->payload_size = 0;
-
         return 0;
 }
 
-static int dkr_import_name_pull_layer(DkrImportName *name) {
-        _cleanup_free_ char *path = NULL, *temp = NULL;
-        const char *url, *layer = NULL, *base = NULL;
-        char **rg;
+static int dkr_import_pull_layer(DkrImport *i) {
+        _cleanup_free_ char *path = NULL;
+        const char *url, *layer = NULL;
         int r;
 
-        assert(name);
-
-        if (name->job_layer) {
-                set_remove(name->job_layer->needed_by, name);
-                name->job_layer = NULL;
-        }
+        assert(i);
+        assert(!i->layer_job);
+        assert(!i->temp_path);
+        assert(!i->final_path);
 
         for (;;) {
-                layer = dkr_import_name_current_layer(name);
-                if (!layer) {
-                        dkr_import_name_maybe_finish(name);
-                        return 0;
-                }
+                layer = dkr_import_current_layer(i);
+                if (!layer)
+                        return 0; /* no more layers */
 
-                path = strjoin(name->import->image_root, "/.dkr-", layer, NULL);
+                path = strjoin(i->image_root, "/.dkr-", layer, NULL);
                 if (!path)
                         return log_oom();
 
@@ -548,625 +511,344 @@ static int dkr_import_name_pull_layer(DkrImportName *name) {
 
                 log_info("Layer %s already exists, skipping.", layer);
 
-                name->current_ancestry++;
+                i->current_ancestry++;
 
                 free(path);
                 path = NULL;
         }
 
-        rg = dkr_import_name_get_registries(name);
-        assert(rg && rg[0]);
+        log_info("Pulling layer %s...", layer);
 
-        url = strappenda(PROTOCOL_PREFIX, rg[0], "/v1/images/", layer, "/layer");
-        r = dkr_import_name_add_job(name, DKR_IMPORT_JOB_LAYER, url, &name->job_layer);
-        if (r < 0) {
-                log_error_errno(r, "Failed to issue HTTP request: %m");
-                return r;
-        }
-        if (r == 0) /* Already downloading this one? */
-                return 0;
+        i->final_path = path;
+        path = NULL;
 
-        log_info("Pulling layer %s...", layer);
+        url = strappenda(PROTOCOL_PREFIX, i->response_registries[0], "/v1/images/", layer, "/layer");
+        r = import_job_new(&i->layer_job, url, i->glue, i);
+        if (r < 0)
+                return log_error_errno(r, "Failed to allocate layer job: %m");
 
-        r = tempfn_random(path, &temp);
+        r = dkr_import_add_token(i, i->layer_job);
         if (r < 0)
                 return log_oom();
 
-        base = dkr_import_name_current_base_layer(name);
-        if (base) {
-                const char *base_path;
-
-                base_path = strappenda(name->import->image_root, "/.dkr-", base);
-                r = btrfs_subvol_snapshot(base_path, temp, false, true);
-        } else
-                r = btrfs_subvol_make(temp);
+        i->layer_job->on_finished = dkr_import_job_on_finished;
+        i->layer_job->on_open_disk = dkr_import_job_on_open_disk;
 
+        r = import_job_begin(i->layer_job);
         if (r < 0)
-                return log_error_errno(r, "Failed to make btrfs subvolume %s", temp);
-
-        name->job_layer->final_path = path;
-        name->job_layer->temp_path = temp;
-        path = temp = NULL;
+                return log_error_errno(r, "Failed to start layer job: %m");
 
         return 0;
 }
 
-static void dkr_import_name_job_finished(DkrImportName *name, DkrImportJob *job) {
+static void dkr_import_job_on_finished(ImportJob *j) {
+        DkrImport *i;
         int r;
 
-        assert(name);
-        assert(job);
+        assert(j);
+        assert(j->userdata);
+
+        i = j->userdata;
+        if (j->error != 0) {
+                if (j == i->images_job)
+                        log_error_errno(j->error, "Failed to retrieve images list. (Wrong index URL?)");
+                else if (j == i->tags_job)
+                        log_error_errno(j->error, "Failed to retrieve tags list.");
+                else if (j == i->ancestry_job)
+                        log_error_errno(j->error, "Failed to retrieve ancestry list.");
+                else if (j == i->json_job)
+                        log_error_errno(j->error, "Failed to retrieve json data.");
+                else
+                        log_error_errno(j->error, "Failed to retrieve layer data.");
+
+                r = j->error;
+                goto finish;
+        }
 
-        if (name->job_images == job) {
+        if (i->images_job == j) {
                 const char *url;
-                char **rg;
 
-                assert(!name->job_tags);
-                assert(!name->job_ancestry);
-                assert(!name->job_json);
-                assert(!name->job_layer);
+                assert(!i->tags_job);
+                assert(!i->ancestry_job);
+                assert(!i->json_job);
+                assert(!i->layer_job);
 
-                rg = dkr_import_name_get_registries(name);
-                if (strv_isempty(rg)) {
-                        log_error("Didn't get registry information.");
+                if (strv_isempty(i->response_registries)) {
                         r = -EBADMSG;
-                        goto fail;
+                        log_error("Didn't get registry information.");
+                        goto finish;
                 }
 
-                log_info("Index lookup succeeded, directed to registry %s.", rg[0]);
+                log_info("Index lookup succeeded, directed to registry %s.", i->response_registries[0]);
 
-                url = strappenda(PROTOCOL_PREFIX, rg[0], "/v1/repositories/", name->name, "/tags/", name->tag);
+                url = strappenda(PROTOCOL_PREFIX, i->response_registries[0], "/v1/repositories/", i->name, "/tags/", i->tag);
+                r = import_job_new(&i->tags_job, url, i->glue, i);
+                if (r < 0) {
+                        log_error_errno(r, "Failed to allocate tags job: %m");
+                        goto finish;
+                }
+
+                r = dkr_import_add_token(i, i->tags_job);
+                if (r < 0) {
+                        log_oom();
+                        goto finish;
+                }
+
+                i->tags_job->on_finished = dkr_import_job_on_finished;
 
-                r = dkr_import_name_add_job(name, DKR_IMPORT_JOB_TAGS, url, &name->job_tags);
+                r = import_job_begin(i->tags_job);
                 if (r < 0) {
-                        log_error_errno(r, "Failed to issue HTTP request: %m");
-                        goto fail;
+                        log_error_errno(r, "Failed to start tags job: %m");
+                        goto finish;
                 }
 
-        } else if (name->job_tags == job) {
+        } else if (i->tags_job == j) {
                 const char *url;
-                char *id = NULL, **rg;
+                char *id = NULL;
 
-                assert(!name->job_ancestry);
-                assert(!name->job_json);
-                assert(!name->job_layer);
+                assert(!i->ancestry_job);
+                assert(!i->json_job);
+                assert(!i->layer_job);
 
-                r = parse_id(job->payload, job->payload_size, &id);
+                r = parse_id(j->payload, j->payload_size, &id);
                 if (r < 0) {
                         log_error_errno(r, "Failed to parse JSON id.");
-                        goto fail;
+                        goto finish;
                 }
 
-                free(name->id);
-                name->id = id;
+                free(i->id);
+                i->id = id;
 
-                rg = dkr_import_name_get_registries(name);
-                assert(rg && rg[0]);
+                log_info("Tag lookup succeeded, resolved to layer %s.", i->id);
+
+                url = strappenda(PROTOCOL_PREFIX, i->response_registries[0], "/v1/images/", i->id, "/ancestry");
+                r = import_job_new(&i->ancestry_job, url, i->glue, i);
+                if (r < 0) {
+                        log_error_errno(r, "Failed to allocate ancestry job: %m");
+                        goto finish;
+                }
+
+                r = dkr_import_add_token(i, i->ancestry_job);
+                if (r < 0) {
+                        log_oom();
+                        goto finish;
+                }
 
-                log_info("Tag lookup succeeded, resolved to layer %s.", name->id);
+                i->ancestry_job->on_finished = dkr_import_job_on_finished;
 
-                url = strappenda(PROTOCOL_PREFIX, rg[0], "/v1/images/", name->id, "/ancestry");
-                r = dkr_import_name_add_job(name, DKR_IMPORT_JOB_ANCESTRY, url, &name->job_ancestry);
+                url = strappenda(PROTOCOL_PREFIX, i->response_registries[0], "/v1/images/", i->id, "/json");
+                r = import_job_new(&i->json_job, url, i->glue, i);
                 if (r < 0) {
-                        log_error_errno(r, "Failed to issue HTTP request: %m");
-                        goto fail;
+                        log_error_errno(r, "Failed to allocate json job: %m");
+                        goto finish;
                 }
 
-                url = strappenda(PROTOCOL_PREFIX, rg[0], "/v1/images/", name->id, "/json");
-                r = dkr_import_name_add_job(name, DKR_IMPORT_JOB_JSON, url, &name->job_json);
+                r = dkr_import_add_token(i, i->json_job);
                 if (r < 0) {
-                        log_error_errno(r, "Failed to issue HTTP request: %m");
-                        goto fail;
+                        log_oom();
+                        goto finish;
                 }
 
-        } else if (name->job_ancestry == job) {
-                char **ancestry = NULL, **i;
+                i->json_job->on_finished = dkr_import_job_on_finished;
+
+                r = import_job_begin(i->ancestry_job);
+                if (r < 0) {
+                        log_error_errno(r, "Failed to start ancestry job: %m");
+                        goto finish;
+                }
+
+                r = import_job_begin(i->json_job);
+                if (r < 0) {
+                        log_error_errno(r, "Failed to start json job: %m");
+                        goto finish;
+                }
+
+        } else if (i->ancestry_job == j) {
+                char **ancestry = NULL, **k;
                 unsigned n;
 
-                r = parse_ancestry(job->payload, job->payload_size, &ancestry);
+                assert(!i->layer_job);
+
+                r = parse_ancestry(j->payload, j->payload_size, &ancestry);
                 if (r < 0) {
                         log_error_errno(r, "Failed to parse JSON id.");
-                        goto fail;
+                        goto finish;
                 }
 
                 n = strv_length(ancestry);
-                if (n <= 0 || !streq(ancestry[n-1], name->id)) {
+                if (n <= 0 || !streq(ancestry[n-1], i->id)) {
                         log_error("Ancestry doesn't end in main layer.");
+                        strv_free(ancestry);
                         r = -EBADMSG;
-                        goto fail;
+                        goto finish;
                 }
 
                 log_info("Ancestor lookup succeeded, requires layers:\n");
-                STRV_FOREACH(i, ancestry)
-                        log_info("\t%s", *i);
+                STRV_FOREACH(k, ancestry)
+                        log_info("\t%s", *k);
 
-                strv_free(name->ancestry);
-                name->ancestry = ancestry;
+                strv_free(i->ancestry);
+                i->ancestry = ancestry;
 
-                name->current_ancestry = 0;
-                r = dkr_import_name_pull_layer(name);
+                i->current_ancestry = 0;
+                r = dkr_import_pull_layer(i);
                 if (r < 0)
-                        goto fail;
-
-        } else if (name->job_json == job) {
+                        goto finish;
 
-                dkr_import_name_maybe_finish(name);
+        } else if (i->layer_job == j) {
+                assert(i->temp_path);
+                assert(i->final_path);
 
-        } else if (name->job_layer == job) {
-
-                name->current_ancestry ++;
-                r = dkr_import_name_pull_layer(name);
-                if (r < 0)
-                        goto fail;
+                j->disk_fd = safe_close(j->disk_fd);
 
-        } else
-                assert_not_reached("Got finished event for unknown curl object");
-
-        return;
-
-fail:
-        dkr_import_finish(name->import, r);
-}
-
-static void dkr_import_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
-        DkrImportJob *job = NULL;
-        CURLcode code;
-        DkrImportName *n;
-        long status;
-        Iterator i;
-        int r;
-
-        if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &job) != CURLE_OK)
-                return;
-
-        if (!job || job->done)
-                return;
-
-        job->done = true;
-
-        if (result != CURLE_OK) {
-                log_error("Transfer failed: %s", curl_easy_strerror(result));
-                r = -EIO;
-                goto fail;
-        }
-
-        code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
-        if (code != CURLE_OK) {
-                log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
-                r = -EIO;
-                goto fail;
-        } else if (status >= 300) {
-                log_error("HTTP request to %s failed with code %li.", job->url, status);
-                r = -EIO;
-                goto fail;
-        } else if (status < 200) {
-                log_error("HTTP request to %s finished with unexpected code %li.", job->url, status);
-                r = -EIO;
-                goto fail;
-        }
-
-        switch (job->type) {
-
-        case DKR_IMPORT_JOB_LAYER: {
-                siginfo_t si;
-
-                if (!job->tar_stream) {
-                        log_error("Downloaded layer too short.");
-                        r = -EIO;
-                        goto fail;
+                if (i->tar_pid > 0) {
+                        r = wait_for_terminate_and_warn("tar", i->tar_pid, true);
+                        i->tar_pid = 0;
+                        if (r < 0)
+                                goto finish;
                 }
 
-                fclose(job->tar_stream);
-                job->tar_stream = NULL;
-
-                assert(job->tar_pid > 0);
-
-                r = wait_for_terminate(job->tar_pid, &si);
+                r = aufs_resolve(i->temp_path);
                 if (r < 0) {
-                        log_error_errno(r, "Failed to wait for tar process: %m");
-                        goto fail;
-                }
-
-                job->tar_pid = 0;
-
-                if (si.si_code != CLD_EXITED || si.si_status != EXIT_SUCCESS) {
-                        log_error_errno(r, "tar failed abnormally.");
-                        r = -EIO;
-                        goto fail;
+                        log_error_errno(r, "Failed to resolve aufs whiteouts: %m");
+                        goto finish;
                 }
 
-                r = aufs_resolve(job->temp_path);
+                r = btrfs_subvol_set_read_only(i->temp_path, true);
                 if (r < 0) {
-                        log_error_errno(r, "Couldn't resolve aufs whiteouts: %m");
-                        goto fail;
+                        log_error_errno(r, "Failed to mark snapshort read-only: %m");
+                        goto finish;
                 }
 
-                r = btrfs_subvol_set_read_only(job->temp_path, true);
-                if (r < 0) {
-                        log_error_errno(r, "Failed to mark snapshot read-only: %m");
-                        goto fail;
-                }
-
-                if (rename(job->temp_path, job->final_path) < 0) {
-                        log_error_errno(r, "Failed to rename snapshot: %m");
-                        goto fail;
+                if (rename(i->temp_path, i->final_path) < 0) {
+                        log_error_errno(errno, "Failed to rename snaphsot: %m");
+                        goto finish;
                 }
 
-                log_info("Completed writing to layer %s", job->final_path);
-                break;
-        }
-
-        default:
-                ;
-        }
-
-        SET_FOREACH(n, job->needed_by, i)
-                dkr_import_name_job_finished(n, job);
-
-        return;
-
-fail:
-        dkr_import_finish(job->import, r);
-}
+                log_info("Completed writing to layer %s.", i->final_path);
 
-static size_t dkr_import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
-        DkrImportJob *j = userdata;
-        size_t sz = size * nmemb;
-        char *p;
-        int r;
+                i->layer_job = import_job_unref(i->layer_job);
+                free(i->temp_path);
+                i->temp_path = NULL;
+                free(i->final_path);
+                i->final_path = NULL;
 
-        assert(contents);
-        assert(j);
-
-        if (j->done) {
-                r = -ESTALE;
-                goto fail;
-        }
-
-        if (j->tar_stream) {
-                size_t l;
-
-                l = fwrite(contents, size, nmemb, j->tar_stream);
-                if (l != nmemb) {
-                        r = log_error_errno(errno, "Failed to write to tar: %m");
-                        goto fail;
-                }
-
-                return l;
-        }
-
-        if (j->payload_size + sz > PAYLOAD_MAX) {
-                log_error("Payload too large.");
-                r = -EFBIG;
-                goto fail;
-        }
+                i->current_ancestry ++;
+                r = dkr_import_pull_layer(i);
+                if (r < 0)
+                        goto finish;
 
-        p = realloc(j->payload, j->payload_size + sz);
-        if (!p) {
-                r = log_oom();
-                goto fail;
-        }
+        } else if (i->json_job != j)
+                assert_not_reached("Got finished event for unknown curl object");
 
-        memcpy(p + j->payload_size, contents, sz);
-        j->payload_size += sz;
-        j->payload = p;
+        if (!dkr_import_is_done(i))
+                return;
 
-        r = dkr_import_job_run_tar(j);
+        r = dkr_import_make_local_copy(i);
         if (r < 0)
-                goto fail;
+                goto finish;
 
-        return sz;
+        r = 0;
 
-fail:
-        dkr_import_finish(j->import, r);
-        return 0;
+finish:
+        if (i->on_finished)
+                i->on_finished(i, r, i->userdata);
+        else
+                sd_event_exit(i->event, r);
 }
 
-static size_t dkr_import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
+static int dkr_import_job_on_header(ImportJob *j, const char *header, size_t sz)  {
         _cleanup_free_ char *registry = NULL;
-        size_t sz = size * nmemb;
-        DkrImportJob *j = userdata;
         char *token;
+        DkrImport *i;
         int r;
 
-        assert(contents);
         assert(j);
+        assert(j->userdata);
 
-        if (j->done) {
-                r = -ESTALE;
-                goto fail;
-        }
+        i = j->userdata;
 
-        r = curl_header_strdup(contents, sz, HEADER_TOKEN, &token);
-        if (r < 0) {
-                log_oom();
-                goto fail;
-        }
+        r = curl_header_strdup(header, sz, HEADER_TOKEN, &token);
+        if (r < 0)
+                return log_oom();
         if (r > 0) {
-                free(j->response_token);
-                j->response_token = token;
+                free(i->response_token);
+                i->response_token = token;
+                return 0;
         }
 
-        r = curl_header_strdup(contents, sz, HEADER_REGISTRY, &registry);
-        if (r < 0) {
-                log_oom();
-                goto fail;
-        }
+        r = curl_header_strdup(header, sz, HEADER_REGISTRY, &registry);
+        if (r < 0)
+                return log_oom();
         if (r > 0) {
-                char **l, **i;
+                char **l, **k;
 
                 l = strv_split(registry, ",");
-                if (!l) {
-                        r = log_oom();
-                        goto fail;
-                }
+                if (!l)
+                        return log_oom();
 
-                STRV_FOREACH(i, l) {
-                        if (!hostname_is_valid(*i)) {
+                STRV_FOREACH(k, l) {
+                        if (!hostname_is_valid(*k)) {
                                 log_error("Registry hostname is not valid.");
                                 strv_free(l);
-                                r = -EBADMSG;
-                                goto fail;
+                                return -EBADMSG;
                         }
                 }
 
-                strv_free(j->response_registries);
-                j->response_registries = l;
+                strv_free(i->response_registries);
+                i->response_registries = l;
         }
 
-        return sz;
-
-fail:
-        dkr_import_finish(j->import, r);
         return 0;
 }
 
-static int dkr_import_name_add_job(DkrImportName *name, DkrImportJobType type, const char *url, DkrImportJob **ret) {
-        _cleanup_(dkr_import_job_unrefp) DkrImportJob *j = NULL;
-        DkrImportJob *f = NULL;
-        const char *t, *token;
+int dkr_import_pull(DkrImport *i, const char *name, const char *tag, const char *local, bool force_local) {
+        const char *url;
         int r;
 
-        assert(name);
-        assert(url);
-        assert(ret);
+        assert(i);
 
-        log_info("Getting %s.", url);
-        f = hashmap_get(name->import->jobs, url);
-        if (f) {
-                if (f->type != type)
-                        return -EINVAL;
+        if (!dkr_name_is_valid(name))
+                return -EINVAL;
 
-                r = set_put(f->needed_by, name);
-                if (r < 0)
-                        return r;
+        if (tag && !dkr_tag_is_valid(tag))
+                return -EINVAL;
 
-                return 0;
-        }
+        if (local && !machine_name_is_valid(local))
+                return -EINVAL;
 
-        r = hashmap_ensure_allocated(&name->import->jobs, &string_hash_ops);
-        if (r < 0)
-                return r;
+        if (i->images_job)
+                return -EBUSY;
 
-        j = new0(DkrImportJob, 1);
-        if (!j)
-                return -ENOMEM;
-
-        j->import = name->import;
-        j->type = type;
-        j->url = strdup(url);
-        if (!j->url)
-                return -ENOMEM;
+        if (!tag)
+                tag = "latest";
 
-        r = set_ensure_allocated(&j->needed_by, &trivial_hash_ops);
+        r = free_and_strdup(&i->local, local);
         if (r < 0)
                 return r;
+        i->force_local = force_local;
 
-        r = curl_glue_make(&j->curl, j->url, j);
+        r = free_and_strdup(&i->name, name);
         if (r < 0)
                 return r;
-
-        token = dkr_import_name_get_token(name);
-        if (token)
-                t = strappenda("Authorization: Token ", token);
-        else
-                t = HEADER_TOKEN " true";
-
-        j->request_header = curl_slist_new("Accept: application/json", t, NULL);
-        if (!j->request_header)
-                return -ENOMEM;
-
-        if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
-                return -EIO;
-
-        if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, dkr_import_job_write_callback) != CURLE_OK)
-                return -EIO;
-
-        if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
-                return -EIO;
-
-        if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, dkr_import_job_header_callback) != CURLE_OK)
-                return -EIO;
-
-        if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
-                return -EIO;
-
-        r = curl_glue_add(name->import->glue, j->curl);
+        r = free_and_strdup(&i->tag, tag);
         if (r < 0)
                 return r;
 
-        r = hashmap_put(name->import->jobs, j->url, j);
-        if (r < 0)
-                return r;
+        url = strappenda(i->index_url, "/v1/repositories/", name, "/images");
 
-        r = set_put(j->needed_by, name);
-        if (r < 0) {
-                hashmap_remove(name->import->jobs, url);
-                return r;
-        }
-
-        *ret = j;
-        j = NULL;
-
-        return 1;
-}
-
-static int dkr_import_name_begin(DkrImportName *name) {
-        const char *url;
-
-        assert(name);
-        assert(!name->job_images);
-
-        url = strappenda(name->import->index_url, "/v1/repositories/", name->name, "/images");
-
-        return dkr_import_name_add_job(name, DKR_IMPORT_JOB_IMAGES, url, &name->job_images);
-}
-
-int dkr_import_new(
-                DkrImport **import,
-                sd_event *event,
-                const char *index_url,
-                const char *image_root,
-                dkr_import_on_finished on_finished,
-                void *userdata) {
-
-        _cleanup_(dkr_import_unrefp) DkrImport *i = NULL;
-        char *e;
-        int r;
-
-        assert(import);
-        assert(dkr_url_is_valid(index_url));
-        assert(image_root);
-
-        i = new0(DkrImport, 1);
-        if (!i)
-                return -ENOMEM;
-
-        i->on_finished = on_finished;
-        i->userdata = userdata;
-
-        i->index_url = strdup(index_url);
-        if (!i->index_url)
-                return -ENOMEM;
-
-        i->image_root = strdup(image_root);
-        if (!i->image_root)
-                return -ENOMEM;
-
-        e = endswith(i->index_url, "/");
-        if (e)
-                *e = 0;
-
-        if (event)
-                i->event = sd_event_ref(event);
-        else {
-                r = sd_event_default(&i->event);
-                if (r < 0)
-                        return r;
-        }
-
-        r = curl_glue_new(&i->glue, i->event);
+        r = import_job_new(&i->images_job, url, i->glue, i);
         if (r < 0)
                 return r;
 
-        i->glue->on_finished = dkr_import_curl_on_finished;
-        i->glue->userdata = i;
-
-        *import = i;
-        i = NULL;
-
-        return 0;
-}
-
-DkrImport* dkr_import_unref(DkrImport *import) {
-        DkrImportName *n;
-        DkrImportJob *j;
-
-        if (!import)
-                return NULL;
-
-        while ((n = hashmap_steal_first(import->names)))
-               dkr_import_name_unref(n);
-        hashmap_free(import->names);
-
-        while ((j = hashmap_steal_first(import->jobs)))
-                dkr_import_job_unref(j);
-        hashmap_free(import->jobs);
-
-        curl_glue_unref(import->glue);
-        sd_event_unref(import->event);
-
-        free(import->index_url);
-        free(import->image_root);
-        free(import);
-
-        return NULL;
-}
-
-int dkr_import_cancel(DkrImport *import, const char *name) {
-        DkrImportName *n;
-
-        assert(import);
-        assert(name);
-
-        n = hashmap_remove(import->names, name);
-        if (!n)
-                return 0;
-
-        dkr_import_name_unref(n);
-        return 1;
-}
-
-int dkr_import_pull(DkrImport *import, const char *name, const char *tag, const char *local, bool force_local) {
-        _cleanup_(dkr_import_name_unrefp) DkrImportName *n = NULL;
-        int r;
-
-        assert(import);
-        assert(dkr_name_is_valid(name));
-        assert(dkr_tag_is_valid(tag));
-        assert(!local || machine_name_is_valid(local));
-
-        if (hashmap_get(import->names, name))
-                return -EEXIST;
-
-        r = hashmap_ensure_allocated(&import->names, &string_hash_ops);
+        r = dkr_import_add_token(i, i->images_job);
         if (r < 0)
                 return r;
 
-        n = new0(DkrImportName, 1);
-        if (!n)
-                return -ENOMEM;
-
-        n->import = import;
-
-        n->name = strdup(name);
-        if (!n->name)
-                return -ENOMEM;
-
-        n->tag = strdup(tag);
-        if (!n->tag)
-                return -ENOMEM;
-
-        if (local) {
-                n->local = strdup(local);
-                if (!n->local)
-                        return -ENOMEM;
-                n->force_local = force_local;
-        }
+        i->images_job->on_finished = dkr_import_job_on_finished;
+        i->images_job->on_header = dkr_import_job_on_header;
 
-        r = hashmap_put(import->names, n->name, n);
-        if (r < 0)
-                return r;
-
-        r = dkr_import_name_begin(n);
-        if (r < 0) {
-                dkr_import_cancel(import, n->name);
-                n = NULL;
-                return r;
-        }
-
-        n = NULL;
-        return 0;
+        return import_job_begin(i->images_job);
 }
 
 bool dkr_name_is_valid(const char *name) {
@@ -1199,14 +881,3 @@ bool dkr_id_is_valid(const char *id) {
 
         return true;
 }
-
-bool dkr_url_is_valid(const char *url) {
-        if (isempty(url))
-                return false;
-
-        if (!startswith(url, "http://") &&
-            !startswith(url, "https://"))
-                return false;
-
-        return ascii_is_valid(url);
-}