chiark / gitweb /
journal: Add sync timer to journal server
authorOleksii Shevchuk <alxchk@gmail.com>
Mon, 25 Mar 2013 16:49:03 +0000 (18:49 +0200)
committerLennart Poettering <lennart@poettering.net>
Mon, 25 Mar 2013 16:51:06 +0000 (17:51 +0100)
Add option to force journal sync with fsync. Default timeout is 5min.
Interval configured via SyncIntervalSec option at journal.conf. Synced
journal files will be marked as OFFLINE.

Manual sync can be performed via sending SIGUSR1.

man/journald.conf.xml
src/journal/journal-file.c
src/journal/journal-file.h
src/journal/journald-gperf.gperf
src/journal/journald-server.c
src/journal/journald-server.h
src/journal/journald.conf

index 0797deb1155e4e22bdfa75934fa2553e2ac3ab10..0b9de65fe3bb54670d111d0e471d099d501d8dfb 100644 (file)
                                 seconds. </para></listitem>
                         </varlistentry>
 
+
+                        <varlistentry>
+                                <term><varname>SyncIntervalSec=</varname></term>
+
+                                <listitem><para>The timeout before syncing journal
+                                data to disk. After syncing journal files have
+                                OFFLINE state. Default timeout is 5 minutes.
+                                </para></listitem>
+                        </varlistentry>
+
                         <varlistentry>
                                 <term><varname>ForwardToSyslog=</varname></term>
                                 <term><varname>ForwardToKMsg=</varname></term>
index 13fc8edea9ff25c923d819924b662cca35c22ea9..5b077be0da4748771fb36e83f2b96ede4954cb39 100644 (file)
 /* How many entries to keep in the entry array chain cache at max */
 #define CHAIN_CACHE_MAX 20
 
+int journal_file_set_online(JournalFile *f) {
+        assert(f);
+
+        if (!f->writable)
+                return -EPERM;
+
+        if (!(f->fd >= 0 && f->header))
+                return -EINVAL;
+
+        switch(f->header->state) {
+                case STATE_ONLINE:
+                        return 0;
+
+                case STATE_OFFLINE:
+                        f->header->state = STATE_ONLINE;
+                        fsync(f->fd);
+                        return 0;
+
+                default:
+                        return -EINVAL;
+        }
+}
+
+int journal_file_set_offline(JournalFile *f) {
+        assert(f);
+
+        if (!f->writable)
+                return -EPERM;
+
+        if (!(f->fd >= 0 && f->header))
+                return -EINVAL;
+
+        if (f->header->state != STATE_ONLINE)
+                return 0;
+
+        fsync(f->fd);
+
+        f->header->state = STATE_OFFLINE;
+
+        fsync(f->fd);
+
+        return 0;
+}
+
 void journal_file_close(JournalFile *f) {
         assert(f);
 
@@ -81,16 +125,10 @@ void journal_file_close(JournalFile *f) {
         if (f->mmap && f->fd >= 0)
                 mmap_cache_close_fd(f->mmap, f->fd);
 
-        if (f->writable && f->fd >= 0)
-                fdatasync(f->fd);
-
-        if (f->header) {
-                /* Mark the file offline. Don't override the archived state if it already is set */
-                if (f->writable && f->header->state == STATE_ONLINE)
-                        f->header->state = STATE_OFFLINE;
+        journal_file_set_offline(f);
 
+        if (f->header)
                 munmap(f->header, PAGE_ALIGN(sizeof(Header)));
-        }
 
         if (f->fd >= 0)
                 close_nointr_nofail(f->fd);
@@ -177,7 +215,7 @@ static int journal_file_refresh_header(JournalFile *f) {
 
         f->header->boot_id = boot_id;
 
-        f->header->state = STATE_ONLINE;
+        journal_file_set_online(f);
 
         /* Sync the online state to disk */
         msync(f->header, PAGE_ALIGN(sizeof(Header)), MS_SYNC);
@@ -457,6 +495,10 @@ int journal_file_append_object(JournalFile *f, int type, uint64_t size, Object *
         assert(offset);
         assert(ret);
 
+        r = journal_file_set_online(f);
+        if (r < 0)
+                return r;
+
         p = le64toh(f->header->tail_object_offset);
         if (p == 0)
                 p = le64toh(f->header->header_size);
@@ -1267,9 +1309,6 @@ int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const st
         assert(f);
         assert(iovec || n_iovec == 0);
 
-        if (!f->writable)
-                return -EPERM;
-
         if (!ts) {
                 dual_timestamp_get(&_ts);
                 ts = &_ts;
index cdbc8e41f615e9989e00e0ed0746ff15e30776dc..0eab5017d76b60fee5affa4528b7d3fd0a6541ee 100644 (file)
@@ -106,6 +106,8 @@ int journal_file_open(
                 JournalFile *template,
                 JournalFile **ret);
 
+int journal_file_set_offline(JournalFile *f);
+int journal_file_set_online(JournalFile *f);
 void journal_file_close(JournalFile *j);
 
 int journal_file_open_reliably(
index 1baef1411c96ccd34a7199640f71e885eec4e989..57b45f9232c54802f9236bade90ba6773688aff7 100644 (file)
@@ -18,6 +18,7 @@ struct ConfigPerfItem;
 Journal.Storage,            config_parse_storage,   0, offsetof(Server, storage)
 Journal.Compress,           config_parse_bool,      0, offsetof(Server, compress)
 Journal.Seal,               config_parse_bool,      0, offsetof(Server, seal)
+Journal.SyncIntervalSec,    config_parse_usec,      0, offsetof(Server, sync_interval_usec)
 Journal.RateLimitInterval,  config_parse_usec,      0, offsetof(Server, rate_limit_interval)
 Journal.RateLimitBurst,     config_parse_unsigned,  0, offsetof(Server, rate_limit_burst)
 Journal.SystemMaxUse,       config_parse_bytes_off, 0, offsetof(Server, system_metrics.max_use)
index 855430a6ba285494e734ac9072b6f51a57bcd78a..a9d7aa181d252887b4eee3d6a61203946cf54429 100644 (file)
@@ -24,6 +24,7 @@
 #include <linux/sockios.h>
 #include <sys/statvfs.h>
 #include <sys/mman.h>
+#include <sys/timerfd.h>
 
 #include <libudev.h>
 #include <systemd/sd-journal.h>
@@ -67,6 +68,7 @@
 
 #define USER_JOURNALS_MAX 1024
 
+#define DEFAULT_SYNC_INTERVAL_USEC (5*USEC_PER_MINUTE)
 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
 #define DEFAULT_RATE_LIMIT_BURST 200
 
@@ -344,6 +346,33 @@ void server_rotate(Server *s) {
         }
 }
 
+void server_sync(Server *s) {
+        JournalFile *f;
+        void *k;
+        Iterator i;
+        int r;
+
+        static const struct itimerspec sync_timer_disable = {};
+
+        if (s->system_journal) {
+                r = journal_file_set_offline(s->system_journal);
+                if (r < 0)
+                        log_error("Failed to sync system journal: %s", strerror(-r));
+        }
+
+        HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
+                r = journal_file_set_offline(f);
+                if (r < 0)
+                        log_error("Failed to sync user journal: %s", strerror(-r));
+        }
+
+        r = timerfd_settime(s->sync_timer_fd, 0, &sync_timer_disable, NULL);
+        if (r < 0)
+                log_error("Failed to disable max timer: %m");
+
+        s->sync_scheduled = false;
+}
+
 void server_vacuum(Server *s) {
         char *p;
         char ids[33];
@@ -475,8 +504,10 @@ static void write_to_journal(Server *s, uid_t uid, struct iovec *iovec, unsigned
         }
 
         r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
-        if (r >= 0)
+        if (r >= 0) {
+                server_schedule_sync(s);
                 return;
+        }
 
         if (vacuumed || !shall_try_append_again(f, r)) {
                 log_error("Failed to write entry, ignoring: %s", strerror(-r));
@@ -990,11 +1021,10 @@ int process_event(Server *s, struct epoll_event *ev) {
                         return -errno;
                 }
 
-                log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo));
-
                 if (sfsi.ssi_signo == SIGUSR1) {
                         touch("/run/systemd/journal/flushed");
                         server_flush_to_var(s);
+                        server_sync(s);
                         return 1;
                 }
 
@@ -1004,8 +1034,23 @@ int process_event(Server *s, struct epoll_event *ev) {
                         return 1;
                 }
 
+                log_info("Received SIG%s", signal_to_string(sfsi.ssi_signo));
+
                 return 0;
 
+        } else if (ev->data.fd == s->sync_timer_fd) {
+                int r;
+                uint64_t t;
+
+                log_debug("Got sync request from epoll.");
+
+                r = read(ev->data.fd, (void *)&t, sizeof(t));
+                if (r < 0)
+                        return 0;
+
+                server_sync(s);
+                return 1;
+
         } else if (ev->data.fd == s->dev_kmsg_fd) {
                 int r;
 
@@ -1285,16 +1330,68 @@ static int server_parse_config_file(Server *s) {
         return r;
 }
 
+static int server_open_sync_timer(Server *s) {
+        int r;
+        struct epoll_event ev;
+
+        assert(s);
+
+        s->sync_timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
+        if (s->sync_timer_fd < 0)
+                return -errno;
+
+        zero(ev);
+        ev.events = EPOLLIN;
+        ev.data.fd = s->sync_timer_fd;
+
+        r = epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->sync_timer_fd, &ev);
+        if (r < 0) {
+                log_error("Failed to add idle timer fd to epoll object: %m");
+                return -errno;
+        }
+
+        return 0;
+}
+
+int server_schedule_sync(Server *s) {
+        int r;
+
+        struct itimerspec sync_timer_enable;
+
+        assert(s);
+
+        if (s->sync_scheduled)
+                return 0;
+
+        if (s->sync_interval_usec) {
+                zero(sync_timer_enable);
+                sync_timer_enable.it_value.tv_sec = s->sync_interval_usec / USEC_PER_SEC;
+                sync_timer_enable.it_value.tv_nsec = s->sync_interval_usec % MSEC_PER_SEC;
+
+                r = timerfd_settime(s->sync_timer_fd, 0, &sync_timer_enable, NULL);
+                if (r < 0)
+                        return -errno;
+        }
+
+        s->sync_scheduled = true;
+
+        return 0;
+}
+
 int server_init(Server *s) {
         int n, r, fd;
 
         assert(s);
 
         zero(*s);
-        s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = s->dev_kmsg_fd = -1;
+        s->sync_timer_fd = s->syslog_fd = s->native_fd = s->stdout_fd =
+            s->signal_fd = s->epoll_fd = s->dev_kmsg_fd = -1;
         s->compress = true;
         s->seal = true;
 
+        s->sync_interval_usec = DEFAULT_SYNC_INTERVAL_USEC;
+        s->sync_scheduled = false;
+
         s->rate_limit_interval = DEFAULT_RATE_LIMIT_INTERVAL;
         s->rate_limit_burst = DEFAULT_RATE_LIMIT_BURST;
 
@@ -1394,6 +1491,10 @@ int server_init(Server *s) {
         if (r < 0)
                 return r;
 
+        r = server_open_sync_timer(s);
+        if (r < 0)
+                return r;
+
         r = open_signalfd(s);
         if (r < 0)
                 return r;
@@ -1466,6 +1567,9 @@ void server_done(Server *s) {
         if (s->dev_kmsg_fd >= 0)
                 close_nointr_nofail(s->dev_kmsg_fd);
 
+        if (s->sync_timer_fd >= 0)
+                close_nointr_nofail(s->sync_timer_fd);
+
         if (s->rate_limit)
                 journal_rate_limit_free(s->rate_limit);
 
index 9f50a29e50c960f8d634f7ca7ef47de13fc7ab71..21edd6bdaf5141b1cfd4a8696238ab12b6076ce5 100644 (file)
@@ -71,6 +71,7 @@ typedef struct Server {
         size_t buffer_size;
 
         JournalRateLimit *rate_limit;
+        usec_t sync_interval_usec;
         usec_t rate_limit_interval;
         unsigned rate_limit_burst;
 
@@ -119,6 +120,9 @@ typedef struct Server {
         uint64_t *kernel_seqnum;
 
         struct udev *udev;
+
+        int sync_timer_fd;
+        bool sync_scheduled;
 } Server;
 
 #define N_IOVEC_META_FIELDS 17
@@ -145,8 +149,10 @@ void server_fix_perms(Server *s, JournalFile *f, uid_t uid);
 bool shall_try_append_again(JournalFile *f, int r);
 int server_init(Server *s);
 void server_done(Server *s);
+void server_sync(Server *s);
 void server_vacuum(Server *s);
 void server_rotate(Server *s);
+int server_schedule_sync(Server *s);
 int server_flush_to_var(Server *s);
 int process_event(Server *s, struct epoll_event *ev);
 void server_maybe_append_tags(Server *s);
index 948318bc6267b808a38320aa0671e48fca66fd71..541047720176094c931a262664ac2f99f66ea9dd 100644 (file)
@@ -12,6 +12,7 @@
 #Compress=yes
 #Seal=yes
 #SplitMode=login
+#SyncIntervalSec=5m
 #RateLimitInterval=10s
 #RateLimitBurst=200
 #SystemMaxUse=