chiark / gitweb /
journal-upload: use journal as the source
[elogind.git] / src / journal-remote / journal-upload-journal.c
diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c
new file mode 100644 (file)
index 0000000..a3be1bf
--- /dev/null
@@ -0,0 +1,402 @@
+#include <stdbool.h>
+
+#include <curl/curl.h>
+
+#include "util.h"
+#include "log.h"
+#include "utf8.h"
+#include "journal-upload.h"
+
+/**
+ * Write up to size bytes to buf. Return negative on error, and number of
+ * bytes written otherwise. The last case is a kind of an error too.
+ */
+static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
+        int r;
+        size_t pos = 0;
+
+        assert(size <= SSIZE_MAX);
+
+        while (true) {
+
+                switch(u->entry_state) {
+                case ENTRY_CURSOR: {
+                        free(u->last_cursor);
+                        u->last_cursor = NULL;
+
+                        r = sd_journal_get_cursor(u->journal, &u->last_cursor);
+                        if (r < 0) {
+                                log_error("Failed to get cursor: %s", strerror(-r));
+                                return r;
+                        }
+
+                        r = snprintf(buf + pos, size - pos,
+                                     "__CURSOR=%s\n", u->last_cursor);
+                        if (pos + r > size)
+                                /* not enough space */
+                                return pos;
+
+                        u->entry_state ++;
+
+                        if (pos + r == size) {
+                                /* exactly one character short, but we don't need it */
+                                buf[size - 1] = '\n';
+                                return size;
+                        }
+
+                        pos += r;
+                }       /* fall through */
+
+                case ENTRY_REALTIME: {
+                        usec_t realtime;
+
+                        r = sd_journal_get_realtime_usec(u->journal, &realtime);
+                        if (r < 0) {
+                                log_error("Failed to get realtime timestamp: %s", strerror(-r));
+                                return r;
+                        }
+
+                        r = snprintf(buf + pos, size - pos,
+                                     "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
+                        if (r + pos > size)
+                                /* not enough space */
+                                return pos;
+
+                        u->entry_state ++;
+
+                        if (r + pos == size) {
+                                /* exactly one character short, but we don't need it */
+                                buf[size - 1] = '\n';
+                                return size;
+                        }
+
+                        pos += r;
+                }       /* fall through */
+
+                case ENTRY_MONOTONIC: {
+                        usec_t monotonic;
+                        sd_id128_t boot_id;
+
+                        r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
+                        if (r < 0) {
+                                log_error("Failed to get monotonic timestamp: %s", strerror(-r));
+                                return r;
+                        }
+
+                        r = snprintf(buf + pos, size - pos,
+                                     "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
+                        if (r + pos > size)
+                                /* not enough space */
+                                return pos;
+
+                        u->entry_state ++;
+
+                        if (r + pos == size) {
+                                /* exactly one character short, but we don't need it */
+                                buf[size - 1] = '\n';
+                                return size;
+                        }
+
+                        pos += r;
+                }       /* fall through */
+
+                case ENTRY_BOOT_ID: {
+                        sd_id128_t boot_id;
+                        char sid[33];
+
+                        r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
+                        if (r < 0) {
+                                log_error("Failed to get monotonic timestamp: %s", strerror(-r));
+                                return r;
+                        }
+
+                        r = snprintf(buf + pos, size - pos,
+                                     "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
+                        if (r + pos> size)
+                                /* not enough space */
+                                return pos;
+
+                        u->entry_state ++;
+
+                        if (r + pos == size) {
+                                /* exactly one character short, but we don't need it */
+                                buf[size - 1] = '\n';
+                                return size;
+                        }
+
+                        pos += r;
+                }       /* fall through */
+
+                case ENTRY_NEW_FIELD: {
+                        u->field_pos = 0;
+
+                        r = sd_journal_enumerate_data(u->journal,
+                                                      &u->field_data,
+                                                      &u->field_length);
+                        if (r < 0) {
+                                log_error("Failed to move to next field in entry: %s",
+                                          strerror(-r));
+                                return r;
+                        } else if (r == 0) {
+                                u->entry_state = ENTRY_OUTRO;
+                                continue;
+                        }
+
+                        if (!utf8_is_printable_newline(u->field_data,
+                                                       u->field_length, false)) {
+                                u->entry_state = ENTRY_BINARY_FIELD_START;
+                                continue;
+                        }
+
+                        u->entry_state ++;
+                }       /* fall through */
+
+                case ENTRY_TEXT_FIELD:
+                case ENTRY_BINARY_FIELD: {
+                        bool done;
+                        size_t tocopy;
+
+                        done = size - pos > u->field_length - u->field_pos;
+                        if (done)
+                                tocopy = u->field_length - u->field_pos;
+                        else
+                                tocopy = size - pos;
+
+                        memcpy(buf + pos,
+                               (char*) u->field_data + u->field_pos,
+                               tocopy);
+
+                        if (done) {
+                                buf[pos + tocopy] = '\n';
+                                pos += tocopy + 1;
+                                u->entry_state = ENTRY_NEW_FIELD;
+                                continue;
+                        } else {
+                                u->field_pos += tocopy;
+                                return size;
+                        }
+                }
+
+                case ENTRY_BINARY_FIELD_START: {
+                        const char *c;
+                        size_t len;
+
+                        c = memchr(u->field_data, '=', u->field_length);
+                        if (!c || c == u->field_data) {
+                                log_error("Invalid field.");
+                                return -EINVAL;
+                        }
+
+                        len = c - (const char*)u->field_data;
+
+                        /* need space for label + '\n' */
+                        if (size - pos < len + 1)
+                                return pos;
+
+                        memcpy(buf + pos, u->field_data, len);
+                        buf[pos + len] = '\n';
+                        pos += len + 1;
+
+                        u->field_pos = len + 1;
+                        u->entry_state ++;
+                }       /* fall through */
+
+                case ENTRY_BINARY_FIELD_SIZE: {
+                        uint64_t le64;
+
+                        /* need space for uint64_t */
+                        if (size - pos < 8)
+                                return pos;
+
+                        le64 = htole64(u->field_length - u->field_pos);
+                        memcpy(buf + pos, &le64, 8);
+                        pos += 8;
+
+                        u->entry_state ++;
+                        continue;
+                }
+
+                case ENTRY_OUTRO:
+                        /* need space for '\n' */
+                        if (size - pos < 1)
+                                return pos;
+
+                        buf[pos++] = '\n';
+                        u->entry_state ++;
+                        u->entries_sent ++;
+
+                        return pos;
+
+                default:
+                        assert_not_reached("WTF?");
+                }
+        }
+        assert_not_reached("WTF?");
+}
+
+static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
+        Uploader *u = userp;
+        int r;
+        sd_journal *j;
+        size_t filled = 0;
+        ssize_t w;
+
+        assert(u);
+        assert(nmemb <= SSIZE_MAX / size);
+
+        j = u->journal;
+
+        while (j && filled < size * nmemb) {
+                if (u->entry_state == ENTRY_DONE) {
+                        r = sd_journal_next(j);
+                        if (r < 0) {
+                                log_error("Failed to move to next entry in journal: %s",
+                                          strerror(-r));
+                                return CURL_READFUNC_ABORT;
+                        } else if (r == 0) {
+                                if (u->input_event)
+                                        log_debug("No more entries, waiting for journal.");
+                                else {
+                                        log_info("No more entries, closing journal.");
+                                        close_journal_input(u);
+                                }
+
+                                u->uploading = false;
+
+                                break;
+                        }
+
+                        u->entry_state = ENTRY_CURSOR;
+                }
+
+                w = write_entry((char*)buf + filled, size * nmemb - filled, u);
+                if (w < 0)
+                        return CURL_READFUNC_ABORT;
+                filled += w;
+
+                if (filled == 0) {
+                        log_error("Buffer space is too small to write entry.");
+                        return CURL_READFUNC_ABORT;
+                } else if (u->entry_state != ENTRY_DONE)
+                        /* This means that all available space was used up */
+                        break;
+
+                log_debug("Entry %zu (%s) has been uploaded.",
+                          u->entries_sent, u->last_cursor);
+        }
+
+        return filled;
+}
+
+void close_journal_input(Uploader *u) {
+        assert(u);
+
+        if (u->journal) {
+                log_debug("Closing journal input.");
+
+                sd_journal_close(u->journal);
+                u->journal = NULL;
+        }
+        u->timeout = 0;
+}
+
+static int process_journal_input(Uploader *u, int skip) {
+        int r;
+
+        r = sd_journal_next_skip(u->journal, skip);
+        if (r < 0) {
+                log_error("Failed to skip to next entry: %s", strerror(-r));
+                return r;
+        } else if (r < skip)
+                return 0;
+
+        /* have data */
+        u->entry_state = ENTRY_CURSOR;
+        return start_upload(u, journal_input_callback, u);
+}
+
+int check_journal_input(Uploader *u) {
+        if (u->input_event) {
+                int r;
+
+                r = sd_journal_process(u->journal);
+                if (r < 0) {
+                        log_error("Failed to process journal: %s", strerror(-r));
+                        close_journal_input(u);
+                        return r;
+                }
+
+                if (r == SD_JOURNAL_NOP)
+                        return 0;
+        }
+
+        return process_journal_input(u, 1);
+}
+
+static int dispatch_journal_input(sd_event_source *event,
+                                  int fd,
+                                  uint32_t revents,
+                                  void *userp) {
+        Uploader *u = userp;
+
+        assert(u);
+
+        if (u->uploading) {
+                log_warning("dispatch_journal_input called when uploading, ignoring.");
+                return 0;
+        }
+
+        log_debug("Detected journal input, checking for new data.");
+        return check_journal_input(u);
+}
+
+int open_journal_for_upload(Uploader *u,
+                            sd_journal *j,
+                            const char *cursor,
+                            bool after_cursor,
+                            bool follow) {
+        int fd, r, events;
+
+        u->journal = j;
+
+        sd_journal_set_data_threshold(j, 0);
+
+        if (follow) {
+                fd = sd_journal_get_fd(j);
+                if (fd < 0) {
+                        log_error("sd_journal_get_fd failed: %s", strerror(-fd));
+                        return fd;
+                }
+
+                events = sd_journal_get_events(j);
+
+                r = sd_journal_reliable_fd(j);
+                assert(r >= 0);
+                if (r > 0)
+                        u->timeout = -1;
+                else
+                        u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
+
+                r = sd_event_add_io(u->events, &u->input_event,
+                                    fd, events, dispatch_journal_input, u);
+                if (r < 0) {
+                        log_error("Failed to register input event: %s", strerror(-r));
+                        return r;
+                }
+
+                log_debug("Listening for journal events on fd:%d, timeout %d",
+                          fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
+        } else
+                log_debug("Not listening for journal events.");
+
+        if (cursor) {
+                r = sd_journal_seek_cursor(j, cursor);
+                if (r < 0) {
+                        log_error("Failed to seek to cursor %s: %s",
+                                  cursor, strerror(-r));
+                        return r;
+                }
+        }
+
+        return process_journal_input(u, 1 + !!after_cursor);
+}