From fe6521272ba203ec8f0d5a94f0729960b3f90525 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 23 Dec 2011 20:50:48 +0100 Subject: [PATCH] journal: implement stdout transport --- src/journal/journal-send.c | 60 ++++ src/journal/journald.c | 573 +++++++++++++++++++++++++++++++++---- src/journal/sd-journal.h | 3 +- src/stdout-syslog-bridge.c | 6 +- src/util.c | 3 +- 5 files changed, 581 insertions(+), 64 deletions(-) diff --git a/src/journal/journal-send.c b/src/journal/journal-send.c index 238d64c13..cc3cd8c30 100644 --- a/src/journal/journal-send.c +++ b/src/journal/journal-send.c @@ -26,6 +26,7 @@ #include "sd-journal.h" #include "util.h" +#include "socket-util.h" /* We open a single fd, and we'll share it with the current process, * all its threads, and all its subprocesses. This means we need to @@ -67,6 +68,12 @@ int sd_journal_printv(int priority, const char *format, va_list ap) { char buffer[8 + LINE_MAX], p[11]; struct iovec iov[2]; + if (priority < 0 || priority > 7) + return -EINVAL; + + if (!format) + return -EINVAL; + snprintf(p, sizeof(p), "PRIORITY=%i", priority & LOG_PRIMASK); char_array_0(p); @@ -197,3 +204,56 @@ int sd_journal_sendv(const struct iovec *iov, int n) { return 0; } + +int sd_journal_stream_fd(const char *tag, int priority, int priority_prefix) { + union sockaddr_union sa; + int fd; + char *header; + size_t l; + ssize_t r; + + if (priority < 0 || priority > 7) + return -EINVAL; + + fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); + if (fd < 0) + return -errno; + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path)); + + r = connect(fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + close_nointr_nofail(fd); + return -errno; + } + + if (!tag) + tag = ""; + + l = strlen(tag); + header = alloca(l + 1 + 2 + 2 + 2); + + memcpy(header, tag, l); + header[l++] = '\n'; + header[l++] = '0' + priority; + header[l++] = '\n'; + header[l++] = '0' + !!priority_prefix; + header[l++] = '\n'; + header[l++] = '0'; + header[l++] = '\n'; + + r = loop_write(fd, header, l, false); + if (r < 0) { + close_nointr_nofail(fd); + return (int) r; + } + + if ((size_t) r != l) { + close_nointr_nofail(fd); + return -errno; + } + + return fd; +} diff --git a/src/journal/journald.c b/src/journal/journald.c index d35e1c119..5d3956ea4 100644 --- a/src/journal/journald.c +++ b/src/journal/journald.c @@ -37,14 +37,19 @@ #include "socket-util.h" #include "acl-util.h" #include "cgroup-util.h" +#include "list.h" #define USER_JOURNALS_MAX 1024 +#define STDOUT_STREAMS_MAX 4096 + +typedef struct StdoutStream StdoutStream; typedef struct Server { int epoll_fd; int signal_fd; int syslog_fd; int native_fd; + int stdout_fd; JournalFile *runtime_journal; JournalFile *system_journal; @@ -58,8 +63,38 @@ typedef struct Server { JournalMetrics metrics; uint64_t max_use; bool compress; + + LIST_HEAD(StdoutStream, stdout_streams); + unsigned n_stdout_streams; } Server; +typedef enum StdoutStreamState { + STDOUT_STREAM_TAG, + STDOUT_STREAM_PRIORITY, + STDOUT_STREAM_PRIORITY_PREFIX, + STDOUT_STREAM_TEE_CONSOLE, + STDOUT_STREAM_RUNNING +} StdoutStreamState; + +struct StdoutStream { + Server *server; + StdoutStreamState state; + + int fd; + + struct ucred ucred; + + char *tag; + int priority; + bool priority_prefix:1; + bool tee_console:1; + + char buffer[LINE_MAX+1]; + size_t length; + + LIST_FIELDS(StdoutStream, stdout_stream); +}; + static void fix_perms(JournalFile *f, uid_t uid) { acl_t acl; acl_entry_t entry; @@ -363,7 +398,6 @@ retry: free(audit_session); free(audit_loginuid); free(cgroup); - } static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) { @@ -551,18 +585,322 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ free(iovec[j].iov_base); } -static int process_event(Server *s, struct epoll_event *ev) { +static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) { + struct iovec iovec[15]; + char *message = NULL, *syslog_priority = NULL; + unsigned n = 0; + size_t tag_len; + int priority; + assert(s); + assert(p); + + priority = s->priority; + + if (s->priority_prefix && + l > 3 && + p[0] == '<' && + p[1] >= '0' && p[1] <= '7' && + p[2] == '>') { + + priority = p[1] - '0'; + p += 3; + l -= 3; + } + + if (l <= 0) + return 0; + + if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_priority); + + tag_len = s->tag ? strlen(s->tag) + 2: 0; + message = malloc(8 + tag_len + l); + if (message) { + memcpy(message, "MESSAGE=", 8); + + if (s->tag) { + memcpy(message+8, s->tag, tag_len-2); + memcpy(message+8+tag_len-2, ": ", 2); + } - if (ev->events != EPOLLIN) { - log_info("Got invalid event from epoll."); - return -EIO; + memcpy(message+8+tag_len, p, l); + iovec[n].iov_base = message; + iovec[n].iov_len = 8+tag_len+l; + n++; } + dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL); + + if (s->tee_console) { + int console; + + console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC); + if (console >= 0) { + n = 0; + if (s->tag) { + IOVEC_SET_STRING(iovec[n++], s->tag); + IOVEC_SET_STRING(iovec[n++], ": "); + } + + iovec[n].iov_base = (void*) p; + iovec[n].iov_len = l; + n++; + + IOVEC_SET_STRING(iovec[n++], (char*) "\n"); + + writev(console, iovec, n); + } + } + + free(message); + free(syslog_priority); + + return 0; +} + +static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) { + assert(s); + assert(p); + + while (l > 0 && strchr(WHITESPACE, *p)) { + l--; + p++; + } + + while (l > 0 && strchr(WHITESPACE, *(p+l-1))) + l--; + + switch (s->state) { + + case STDOUT_STREAM_TAG: + + if (l > 0) { + s->tag = strndup(p, l); + if (!s->tag) { + log_error("Out of memory"); + return -EINVAL; + } + } + + s->state = STDOUT_STREAM_PRIORITY; + return 0; + + case STDOUT_STREAM_PRIORITY: + if (l != 1 || *p < '0' || *p > '7') { + log_warning("Failed to parse log priority line."); + return -EINVAL; + } + + s->priority = *p - '0'; + s->state = STDOUT_STREAM_PRIORITY_PREFIX; + return 0; + + case STDOUT_STREAM_PRIORITY_PREFIX: + if (l != 1 || *p < '0' || *p > '1') { + log_warning("Failed to parse priority prefix line."); + return -EINVAL; + } + + s->priority_prefix = *p - '0'; + s->state = STDOUT_STREAM_TEE_CONSOLE; + return 0; + + case STDOUT_STREAM_TEE_CONSOLE: + if (l != 1 || *p < '0' || *p > '1') { + log_warning("Failed to parse tee to console line."); + return -EINVAL; + } + + s->tee_console = *p - '0'; + s->state = STDOUT_STREAM_RUNNING; + return 0; + + case STDOUT_STREAM_RUNNING: + return stdout_stream_log(s, p, l); + } + + assert_not_reached("Unknown stream state"); +} + +static int stdout_stream_scan(StdoutStream *s, bool force_flush) { + char *p; + size_t remaining; + int r; + + assert(s); + + p = s->buffer; + remaining = s->length; + for (;;) { + char *end; + size_t skip; + + end = memchr(p, '\n', remaining); + if (!end) { + if (remaining >= LINE_MAX) { + end = p + LINE_MAX; + skip = LINE_MAX; + } else + break; + } else + skip = end - p + 1; + + r = stdout_stream_line(s, p, end - p); + if (r < 0) + return r; + + remaining -= skip; + p += skip; + } + + if (force_flush && remaining > 0) { + r = stdout_stream_line(s, p, remaining); + if (r < 0) + return r; + + p += remaining; + remaining = 0; + } + + if (p > s->buffer) { + memmove(s->buffer, p, remaining); + s->length = remaining; + } + + return 0; +} + +static int stdout_stream_process(StdoutStream *s) { + ssize_t l; + int r; + + assert(s); + + l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length); + if (l < 0) { + + if (errno == EAGAIN) + return 0; + + log_warning("Failed to read from stream: %m"); + return -errno; + } + + if (l == 0) { + r = stdout_stream_scan(s, true); + if (r < 0) + return r; + + return 0; + } + + s->length += l; + r = stdout_stream_scan(s, false); + if (r < 0) + return r; + + return 1; + +} + +static void stdout_stream_free(StdoutStream *s) { + assert(s); + + if (s->server) { + assert(s->server->n_stdout_streams > 0); + s->server->n_stdout_streams --; + LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s); + } + + if (s->fd >= 0) { + if (s->server) + epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL); + + close_nointr_nofail(s->fd); + } + + free(s->tag); + free(s); +} + +static int stdout_stream_new(Server *s) { + StdoutStream *stream; + int fd, r; + socklen_t len; + struct epoll_event ev; + + assert(s); + + fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); + if (fd < 0) { + if (errno == EAGAIN) + return 0; + + log_error("Failed to accept stdout connection: %m"); + return -errno; + } + + if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) { + log_warning("Too many stdout streams, refusing connection."); + close_nointr_nofail(fd); + return 0; + } + + stream = new0(StdoutStream, 1); + if (!stream) { + log_error("Out of memory."); + close_nointr_nofail(fd); + return -ENOMEM; + } + + stream->fd = fd; + + len = sizeof(stream->ucred); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) { + log_error("Failed to determine peer credentials: %m"); + r = -errno; + goto fail; + } + + if (shutdown(fd, SHUT_WR) < 0) { + log_error("Failed to shutdown writing side of socket: %m"); + r = -errno; + goto fail; + } + + zero(ev); + ev.data.ptr = stream; + ev.events = EPOLLIN; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { + log_error("Failed to add stream to event loop: %m"); + r = -errno; + goto fail; + } + + stream->server = s; + LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream); + s->n_stdout_streams ++; + + return 0; + +fail: + stdout_stream_free(stream); + return r; +} + +static int process_event(Server *s, struct epoll_event *ev) { + assert(s); + if (ev->data.fd == s->signal_fd) { struct signalfd_siginfo sfsi; ssize_t n; + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + n = read(s->signal_fd, &sfsi, sizeof(sfsi)); if (n != sizeof(sfsi)) { @@ -578,10 +916,14 @@ static int process_event(Server *s, struct epoll_event *ev) { log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo)); return 0; - } + } else if (ev->data.fd == s->native_fd || + ev->data.fd == s->syslog_fd) { + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } - if (ev->data.fd == s->native_fd || - ev->data.fd == s->syslog_fd) { for (;;) { struct msghdr msghdr; struct iovec iovec; @@ -668,6 +1010,38 @@ static int process_event(Server *s, struct epoll_event *ev) { } return 1; + + } else if (ev->data.fd == s->stdout_fd) { + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + stdout_stream_new(s); + return 1; + + } else { + StdoutStream *stream; + + if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + /* If it is none of the well-known fds, it must be an + * stdout stream fd. Note that this is a bit ugly here + * (since we rely that none of the well-known fds + * could be interpreted as pointer), but nonetheless + * safe, since the well-known fds would never get an + * fd > 4096, i.e. beyond the first memory page */ + + stream = ev->data.ptr; + + if (stdout_stream_process(stream) <= 0) + stdout_stream_free(stream); + + return 1; } log_error("Unknown event."); @@ -737,6 +1111,7 @@ static int system_journal_open(Server *s) { static int open_syslog_socket(Server *s) { union sockaddr_union sa; int one, r; + struct epoll_event ev; assert(s); @@ -777,12 +1152,21 @@ static int open_syslog_socket(Server *s) { return -errno; } + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->syslog_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) { + log_error("Failed to add syslog server fd to epoll object: %m"); + return -errno; + } + return 0; } static int open_native_socket(Server*s) { union sockaddr_union sa; int one, r; + struct epoll_event ev; assert(s); @@ -823,24 +1207,110 @@ static int open_native_socket(Server*s) { return -errno; } + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->native_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) { + log_error("Failed to add native server fd to epoll object: %m"); + return -errno; + } + return 0; } -static int server_init(Server *s) { - int n, r, fd; +static int open_stdout_socket(Server *s) { + union sockaddr_union sa; + int r; struct epoll_event ev; + + assert(s); + + if (s->stdout_fd < 0) { + + s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); + if (s->stdout_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); + + if (listen(s->stdout_fd, SOMAXCONN) < 0) { + log_error("liste() failed: %m"); + return -errno; + } + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->stdout_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) { + log_error("Failed to add stdout server fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_signalfd(Server *s) { sigset_t mask; + struct epoll_event ev; + + assert(s); + + assert_se(sigemptyset(&mask) == 0); + sigset_add_many(&mask, SIGINT, SIGTERM, -1); + assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); + + s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); + if (s->signal_fd < 0) { + log_error("signalfd(): %m"); + return -errno; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->signal_fd; + + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) { + log_error("epoll_ctl(): %m"); + return -errno; + } + + return 0; +} + +static int server_init(Server *s) { + int n, r, fd; assert(s); zero(*s); - s->syslog_fd = s->native_fd = s->signal_fd = -1; + s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1; s->metrics.max_size = DEFAULT_MAX_SIZE; s->metrics.min_size = DEFAULT_MIN_SIZE; s->metrics.keep_free = DEFAULT_KEEP_FREE; s->max_use = DEFAULT_MAX_USE; s->compress = true; + s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func); + if (!s->user_journals) { + log_error("Out of memory."); + return -ENOMEM; + } + s->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (s->epoll_fd < 0) { log_error("Failed to create epoll object: %m"); @@ -855,23 +1325,33 @@ static int server_init(Server *s) { for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) { - if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) { + if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) { - if (s->syslog_fd >= 0) { - log_error("Too many /dev/log sockets passed."); + if (s->native_fd >= 0) { + log_error("Too many native sockets passed."); return -EINVAL; } - s->syslog_fd = fd; + s->native_fd = fd; - } else if (sd_is_socket(fd, AF_UNIX, SOCK_DGRAM, -1) > 0) { + } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) { - if (s->native_fd >= 0) { - log_error("Too many native sockets passed."); + if (s->stdout_fd >= 0) { + log_error("Too many stdout sockets passed."); return -EINVAL; } - s->native_fd = fd; + s->stdout_fd = fd; + + } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) { + + if (s->syslog_fd >= 0) { + log_error("Too many /dev/log sockets passed."); + return -EINVAL; + } + + s->syslog_fd = fd; + } else { log_error("Unknown socket passed."); return -EINVAL; @@ -882,54 +1362,21 @@ static int server_init(Server *s) { if (r < 0) return r; - zero(ev); - ev.events = EPOLLIN; - ev.data.fd = s->syslog_fd; - if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) { - log_error("Failed to add syslog server fd to epoll object: %m"); - return -errno; - } - r = open_native_socket(s); if (r < 0) return r; - zero(ev); - ev.events = EPOLLIN; - ev.data.fd = s->native_fd; - if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) { - log_error("Failed to add native server fd to epoll object: %m"); - return -errno; - } - - s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func); - if (!s->user_journals) { - log_error("Out of memory."); - return -ENOMEM; - } + r = open_stdout_socket(s); + if (r < 0) + return r; r = system_journal_open(s); if (r < 0) return r; - assert_se(sigemptyset(&mask) == 0); - sigset_add_many(&mask, SIGINT, SIGTERM, -1); - assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); - - s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); - if (s->signal_fd < 0) { - log_error("signalfd(): %m"); - return -errno; - } - - zero(ev); - ev.events = EPOLLIN; - ev.data.fd = s->signal_fd; - - if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) { - log_error("epoll_ctl(): %m"); - return -errno; - } + r = open_signalfd(s); + if (r < 0) + return r; return 0; } @@ -938,6 +1385,9 @@ static void server_done(Server *s) { JournalFile *f; assert(s); + while (s->stdout_streams) + stdout_stream_free(s->stdout_streams); + if (s->system_journal) journal_file_close(s->system_journal); @@ -960,6 +1410,9 @@ static void server_done(Server *s) { if (s->native_fd >= 0) close_nointr_nofail(s->native_fd); + + if (s->stdout_fd >= 0) + close_nointr_nofail(s->stdout_fd); } int main(int argc, char *argv[]) { @@ -991,7 +1444,7 @@ int main(int argc, char *argv[]) { sd_notify(false, "READY=1\n" - "STATUS=Processing messages..."); + "STATUS=Processing requests..."); for (;;) { struct epoll_event event; @@ -1015,6 +1468,8 @@ int main(int argc, char *argv[]) { break; } + log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid()); + finish: sd_notify(false, "STATUS=Shutting down..."); diff --git a/src/journal/sd-journal.h b/src/journal/sd-journal.h index b29680b3a..0333db4a4 100644 --- a/src/journal/sd-journal.h +++ b/src/journal/sd-journal.h @@ -33,7 +33,6 @@ * * - check LE/BE conversion for 8bit, 16bit, 32bit values * - implement audit gateway - * - implement stdout gateway * - extend hash tables table as we go * - accelerate looking for "all hostnames" and suchlike. * - throttling @@ -49,6 +48,8 @@ int sd_journal_printv(int priority, const char *format, va_list ap); int sd_journal_send(const char *format, ...) __attribute__((sentinel)); int sd_journal_sendv(const struct iovec *iov, int n); +int sd_journal_stream_fd(const char *tag, int priority, int priority_prefix); + /* Browse journal stream */ typedef struct sd_journal sd_journal; diff --git a/src/stdout-syslog-bridge.c b/src/stdout-syslog-bridge.c index d50df22c8..6ec23ec61 100644 --- a/src/stdout-syslog-bridge.c +++ b/src/stdout-syslog-bridge.c @@ -236,7 +236,6 @@ static int stream_log(Stream *s, char *p, usec_t ts) { writev(console, iovec, 4); } - } return 0; @@ -366,7 +365,6 @@ static int stream_process(Stream *s, usec_t ts) { return -errno; } - if (l == 0) return 0; @@ -409,8 +407,10 @@ static int stream_new(Server *s, int server_fd) { int r; assert(s); + assert(server_fd >= 0); - if ((fd = accept4(server_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC)) < 0) + fd = accept4(server_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); + if (fd < 0) return -errno; if (s->n_streams >= STREAMS_MAX) { diff --git a/src/util.c b/src/util.c index 195835425..c07c569c2 100644 --- a/src/util.c +++ b/src/util.c @@ -2885,7 +2885,8 @@ ssize_t loop_write(int fd, const void *buf, size_t nbytes, bool do_poll) { while (nbytes > 0) { ssize_t k; - if ((k = write(fd, p, nbytes)) <= 0) { + k = write(fd, p, nbytes); + if (k <= 0) { if (k < 0 && errno == EINTR) continue; -- 2.30.2