chiark / gitweb /
journal: add inline compression support with XZ
[elogind.git] / src / journal / journal-file.c
index 8a864cb9137034ac10c49e9e4afccf194ce3ded2..a0c479fc676983a5989664a7844e41802d4ee7a8 100644 (file)
 #include "journal-def.h"
 #include "journal-file.h"
 #include "lookup3.h"
-
-#define DEFAULT_ARENA_MAX_SIZE (16ULL*1024ULL*1024ULL*1024ULL)
-#define DEFAULT_ARENA_MIN_SIZE (256ULL*1024ULL)
-#define DEFAULT_ARENA_KEEP_FREE (1ULL*1024ULL*1024ULL)
-
-#define DEFAULT_MAX_USE (16ULL*1024ULL*1024ULL*16ULL)
+#include "compress.h"
 
 #define DEFAULT_DATA_HASH_TABLE_SIZE (2047ULL*16ULL)
 #define DEFAULT_FIELD_HASH_TABLE_SIZE (2047ULL*16ULL)
 
 #define DEFAULT_WINDOW_SIZE (128ULL*1024ULL*1024ULL)
 
+#define COMPRESSION_SIZE_THRESHOLD (64ULL)
+
 static const char signature[] = { 'L', 'P', 'K', 'S', 'H', 'H', 'R', 'H' };
 
 #define ALIGN64(x) (((x) + 7ULL) & ~7ULL)
@@ -63,6 +60,11 @@ void journal_file_close(JournalFile *f) {
                 close_nointr_nofail(f->fd);
 
         free(f->path);
+
+#ifdef HAVE_XZ
+        free(f->compress_buffer);
+#endif
+
         free(f);
 }
 
@@ -76,9 +78,6 @@ static int journal_file_init_header(JournalFile *f, JournalFile *template) {
         zero(h);
         memcpy(h.signature, signature, 8);
         h.arena_offset = htole64(ALIGN64(sizeof(h)));
-        h.arena_max_size = htole64(DEFAULT_ARENA_MAX_SIZE);
-        h.arena_min_size = htole64(DEFAULT_ARENA_MIN_SIZE);
-        h.arena_keep_free = htole64(DEFAULT_ARENA_KEEP_FREE);
 
         r = sd_id128_randomize(&h.file_id);
         if (r < 0)
@@ -129,8 +128,13 @@ static int journal_file_verify_header(JournalFile *f) {
         if (memcmp(f->header, signature, 8))
                 return -EBADMSG;
 
+#ifdef HAVE_XZ
+        if ((le64toh(f->header->incompatible_flags) & ~HEADER_INCOMPATIBLE_COMPRESSED) != 0)
+                return -EPROTONOSUPPORT;
+#else
         if (f->header->incompatible_flags != 0)
                 return -EPROTONOSUPPORT;
+#endif
 
         if ((uint64_t) f->last_stat.st_size < (le64toh(f->header->arena_offset) + le64toh(f->header->arena_size)))
                 return -ENODATA;
@@ -161,16 +165,10 @@ static int journal_file_verify_header(JournalFile *f) {
 }
 
 static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size) {
-        uint64_t asize;
         uint64_t old_size, new_size;
 
         assert(f);
 
-        if (offset < le64toh(f->header->arena_offset))
-                return -EINVAL;
-
-        new_size = PAGE_ALIGN(offset + size);
-
         /* We assume that this file is not sparse, and we know that
          * for sure, since we always call posix_fallocate()
          * ourselves */
@@ -179,12 +177,19 @@ static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size)
                 le64toh(f->header->arena_offset) +
                 le64toh(f->header->arena_size);
 
-        if (old_size >= new_size)
+        new_size = PAGE_ALIGN(offset + size);
+        if (new_size < le64toh(f->header->arena_offset))
+                new_size = le64toh(f->header->arena_offset);
+
+        if (new_size <= old_size)
                 return 0;
 
-        asize = new_size - le64toh(f->header->arena_offset);
+        if (f->metrics.max_size > 0 &&
+            new_size > f->metrics.max_size)
+                return -E2BIG;
 
-        if (asize > le64toh(f->header->arena_min_size)) {
+        if (new_size > f->metrics.min_size &&
+            f->metrics.keep_free > 0) {
                 struct statvfs svfs;
 
                 if (fstatvfs(f->fd, &svfs) >= 0) {
@@ -192,8 +197,8 @@ static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size)
 
                         available = svfs.f_bfree * svfs.f_bsize;
 
-                        if (available >= f->header->arena_keep_free)
-                                available -= f->header->arena_keep_free;
+                        if (available >= f->metrics.keep_free)
+                                available -= f->metrics.keep_free;
                         else
                                 available = 0;
 
@@ -202,16 +207,16 @@ static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size)
                 }
         }
 
-        if (asize > le64toh(f->header->arena_max_size))
-                return -E2BIG;
-
+        /* Note that the glibc fallocate() fallback is very
+           inefficient, hence we try to minimize the allocation area
+           as we can. */
         if (posix_fallocate(f->fd, old_size, new_size - old_size) < 0)
                 return -errno;
 
         if (fstat(f->fd, &f->last_stat) < 0)
                 return -errno;
 
-        f->header->arena_size = htole64(asize);
+        f->header->arena_size = new_size - htole64(f->header->arena_offset);
 
         return 0;
 }
@@ -317,7 +322,7 @@ static bool verify_hash(Object *o) {
 
         assert(o);
 
-        if (o->object.type == OBJECT_DATA) {
+        if (o->object.type == OBJECT_DATA && !(o->object.flags & OBJECT_COMPRESSED)) {
                 h1 = le64toh(o->data.hash);
                 h2 = hash64(o->data.payload, le64toh(o->object.size) - offsetof(Object, data.payload));
         } else if (o->object.type == OBJECT_FIELD) {
@@ -576,6 +581,9 @@ int journal_file_find_data_object_with_hash(
 
         osize = offsetof(Object, data.payload) + size;
 
+        if (f->header->data_hash_table_size == 0)
+                return -EBADMSG;
+
         h = hash % (le64toh(f->header->data_hash_table_size) / sizeof(HashItem));
         p = le64toh(f->data_hash_table[h].head_hash_offset);
 
@@ -586,12 +594,40 @@ int journal_file_find_data_object_with_hash(
                 if (r < 0)
                         return r;
 
-                if (le64toh(o->object.size) == osize &&
-                    memcmp(o->data.payload, data, size) == 0) {
+                if (le64toh(o->data.hash) != hash)
+                        return -EBADMSG;
 
-                        if (le64toh(o->data.hash) != hash)
+                if (o->object.flags & OBJECT_COMPRESSED) {
+#ifdef HAVE_XZ
+                        uint64_t l, rsize;
+
+                        l = le64toh(o->object.size);
+                        if (l <= offsetof(Object, data.payload))
                                 return -EBADMSG;
 
+                        l -= offsetof(Object, data.payload);
+
+                        if (!uncompress_blob(o->data.payload, l, &f->compress_buffer, &f->compress_buffer_size, &rsize))
+                                return -EBADMSG;
+
+                        if (rsize == size &&
+                            memcmp(f->compress_buffer, data, size) == 0) {
+
+                                if (ret)
+                                        *ret = o;
+
+                                if (offset)
+                                        *offset = p;
+
+                                return 1;
+                        }
+#else
+                        return -EPROTONOSUPPORT;
+#endif
+
+                } else if (le64toh(o->object.size) == osize &&
+                           memcmp(o->data.payload, data, size) == 0) {
+
                         if (ret)
                                 *ret = o;
 
@@ -629,6 +665,7 @@ static int journal_file_append_data(JournalFile *f, const void *data, uint64_t s
         uint64_t osize;
         Object *o;
         int r;
+        bool compressed = false;
 
         assert(f);
         assert(data || size == 0);
@@ -655,7 +692,27 @@ static int journal_file_append_data(JournalFile *f, const void *data, uint64_t s
                 return r;
 
         o->data.hash = htole64(hash);
-        memcpy(o->data.payload, data, size);
+
+#ifdef HAVE_XZ
+        if (f->compress &&
+            size >= COMPRESSION_SIZE_THRESHOLD) {
+                uint64_t rsize;
+
+                compressed = compress_blob(data, size, o->data.payload, &rsize);
+
+                if (compressed) {
+                        o->object.size = htole64(offsetof(Object, data.payload) + rsize);
+                        o->object.flags |= OBJECT_COMPRESSED;
+
+                        f->header->incompatible_flags = htole32(le32toh(f->header->incompatible_flags) | HEADER_INCOMPATIBLE_COMPRESSED);
+
+                        log_debug("Compressed data object %lu -> %lu", (unsigned long) size, (unsigned long) rsize);
+                }
+        }
+#endif
+
+        if (!compressed)
+                memcpy(o->data.payload, data, size);
 
         r = journal_file_link_data(f, o, p, hash);
         if (r < 0)
@@ -816,7 +873,7 @@ static int journal_file_link_entry(JournalFile *f, Object *o, uint64_t offset) {
         if (r < 0)
                 return r;
 
-        log_error("%s %lu", f->path, (unsigned long) f->header->n_entries);
+        log_error("=> %s seqnr=%lu n_entries=%lu", f->path, (unsigned long) o->entry.seqnum, (unsigned long) f->header->n_entries);
 
         if (f->header->head_entry_realtime == 0)
                 f->header->head_entry_realtime = o->entry.realtime;
@@ -887,6 +944,8 @@ static void journal_file_post_change(JournalFile *f) {
          * trigger IN_MODIFY by truncating the journal file to its
          * current size which triggers IN_MODIFY. */
 
+        __sync_synchronize();
+
         if (ftruncate(f->fd, f->last_stat.st_size) < 0)
                 log_error("Failed to to truncate file to its own size: %m");
 }
@@ -1588,6 +1647,9 @@ void journal_file_dump(JournalFile *f) {
                         break;
                 }
 
+                if (o->object.flags & OBJECT_COMPRESSED)
+                        printf("Flags: COMPRESSED\n");
+
                 if (p == le64toh(f->header->tail_object_offset))
                         p = 0;
                 else
@@ -1626,6 +1688,10 @@ int journal_file_open(
         f->writable = (flags & O_ACCMODE) != O_RDONLY;
         f->prot = prot_from_flags(flags);
 
+        f->metrics.max_size = DEFAULT_MAX_SIZE;
+        f->metrics.min_size = DEFAULT_MIN_SIZE;
+        f->metrics.keep_free = DEFAULT_KEEP_FREE;
+
         f->path = strdup(fname);
         if (!f->path) {
                 r = -ENOMEM;