chiark / gitweb /
journal: if the syslog forwarder socket is full, then don't block
[elogind.git] / src / journal / journald.c
index d7f0ed52b4ac742ac10c52560e105c2feba905f1..f924c9353b96a5bb0d4db1af2561b97691ebda56 100644 (file)
@@ -25,8 +25,6 @@
 #include <sys/signalfd.h>
 #include <unistd.h>
 #include <fcntl.h>
-#include <sys/acl.h>
-#include <acl/libacl.h>
 #include <stddef.h>
 #include <sys/ioctl.h>
 #include <linux/sockios.h>
 #include "hashmap.h"
 #include "journal-file.h"
 #include "socket-util.h"
-#include "acl-util.h"
 #include "cgroup-util.h"
 #include "list.h"
 #include "journal-rate-limit.h"
 #include "journal-internal.h"
 #include "conf-parser.h"
 #include "journald.h"
+#include "virt.h"
+
+#ifdef HAVE_ACL
+#include <sys/acl.h>
+#include <acl/libacl.h>
+#include "acl-util.h"
+#endif
+
+#ifdef HAVE_SELINUX
+#include <selinux/selinux.h>
+#endif
 
 #define USER_JOURNALS_MAX 1024
 #define STDOUT_STREAMS_MAX 4096
 
 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
 
-#define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
+#define SYSLOG_TIMEOUT_USEC (250*USEC_PER_MSEC)
 
-#define N_IOVEC_META_FIELDS 16
+#define N_IOVEC_META_FIELDS 17
 
 typedef enum StdoutStreamState {
-        STDOUT_STREAM_TAG,
+        STDOUT_STREAM_IDENTIFIER,
         STDOUT_STREAM_PRIORITY,
-        STDOUT_STREAM_PRIORITY_PREFIX,
+        STDOUT_STREAM_LEVEL_PREFIX,
         STDOUT_STREAM_FORWARD_TO_SYSLOG,
         STDOUT_STREAM_FORWARD_TO_KMSG,
         STDOUT_STREAM_FORWARD_TO_CONSOLE,
@@ -80,9 +88,9 @@ struct StdoutStream {
 
         struct ucred ucred;
 
-        char *tag;
+        char *identifier;
         int priority;
-        bool priority_prefix:1;
+        bool level_prefix:1;
         bool forward_to_syslog:1;
         bool forward_to_kmsg:1;
         bool forward_to_console:1;
@@ -179,18 +187,42 @@ finish:
         return avail;
 }
 
-static void fix_perms(JournalFile *f, uid_t uid) {
+static void server_read_file_gid(Server *s) {
+        const char *adm = "adm";
+        int r;
+
+        assert(s);
+
+        if (s->file_gid_valid)
+                return;
+
+        r = get_group_creds(&adm, &s->file_gid);
+        if (r < 0)
+                log_warning("Failed to resolve 'adm' group: %s", strerror(-r));
+
+        /* if we couldn't read the gid, then it will be 0, but that's
+         * fine and we shouldn't try to resolve the group again, so
+         * let's just pretend it worked right-away. */
+        s->file_gid_valid = true;
+}
+
+static void server_fix_perms(Server *s, JournalFile *f, uid_t uid) {
+        int r;
+#ifdef HAVE_ACL
         acl_t acl;
         acl_entry_t entry;
         acl_permset_t permset;
-        int r;
+#endif
 
         assert(f);
 
-        r = fchmod_and_fchown(f->fd, 0640, 0, 0);
+        server_read_file_gid(s);
+
+        r = fchmod_and_fchown(f->fd, 0640, 0, s->file_gid);
         if (r < 0)
                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
 
+#ifdef HAVE_ACL
         if (uid <= 0)
                 return;
 
@@ -223,6 +255,7 @@ static void fix_perms(JournalFile *f, uid_t uid) {
 
 finish:
         acl_free(acl);
+#endif
 }
 
 static JournalFile* find_journal(Server *s, uid_t uid) {
@@ -269,7 +302,7 @@ static JournalFile* find_journal(Server *s, uid_t uid) {
         if (r < 0)
                 return s->system_journal;
 
-        fix_perms(f, uid);
+        server_fix_perms(s, f, uid);
         f->metrics = s->system_metrics;
         f->compress = s->compress;
 
@@ -407,7 +440,7 @@ static void dispatch_message_real(Server *s,
                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
                 *audit_session = NULL, *audit_loginuid = NULL,
                 *exe = NULL, *cgroup = NULL, *session = NULL,
-                *owner_uid = NULL, *unit = NULL;
+                *owner_uid = NULL, *unit = NULL, *selinux_context = NULL;
 
         char idbuf[33];
         sd_id128_t id;
@@ -425,6 +458,9 @@ static void dispatch_message_real(Server *s,
         if (ucred) {
                 uint32_t audit;
                 uid_t owner;
+#ifdef HAVE_SELINUX
+                security_context_t con;
+#endif
 
                 realuid = ucred->uid;
 
@@ -502,6 +538,16 @@ static void dispatch_message_real(Server *s,
                 if (sd_pid_get_owner_uid(ucred->uid, &owner) >= 0)
                         if (asprintf(&owner_uid, "_SYSTEMD_OWNER_UID=%lu", (unsigned long) owner) >= 0)
                                 IOVEC_SET_STRING(iovec[n++], owner_uid);
+
+#ifdef HAVE_SELINUX
+                if (getpidcon(ucred->pid, &con) >= 0) {
+                        selinux_context = strappend("_SELINUX_CONTEXT=", con);
+                        if (selinux_context)
+                                IOVEC_SET_STRING(iovec[n++], selinux_context);
+
+                        freecon(con);
+                }
+#endif
         }
 
         if (tv) {
@@ -573,12 +619,13 @@ retry:
         free(session);
         free(owner_uid);
         free(unit);
+        free(selinux_context);
 }
 
 static void driver_message(Server *s, sd_id128_t message_id, const char *format, ...) {
         char mid[11 + 32 + 1];
         char buffer[16 + LINE_MAX + 1];
-        struct iovec iovec[N_IOVEC_META_FIELDS + 3];
+        struct iovec iovec[N_IOVEC_META_FIELDS + 4];
         int n = 0;
         va_list ap;
         struct ucred ucred;
@@ -587,6 +634,7 @@ static void driver_message(Server *s, sd_id128_t message_id, const char *format,
         assert(format);
 
         IOVEC_SET_STRING(iovec[n++], "PRIORITY=5");
+        IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=driver");
 
         memcpy(buffer, "MESSAGE=", 8);
         va_start(ap, format);
@@ -680,7 +728,7 @@ static void forward_syslog_iovec(Server *s, const struct iovec *iovec, unsigned
 
         zero(sa);
         sa.un.sun_family = AF_UNIX;
-        strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
+        strncpy(sa.un.sun_path, "/run/systemd/journal/syslog", sizeof(sa.un.sun_path));
         msghdr.msg_name = &sa;
         msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
 
@@ -704,6 +752,11 @@ static void forward_syslog_iovec(Server *s, const struct iovec *iovec, unsigned
         if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
                 return;
 
+        /* The socket is full? I guess the syslog implementation is
+         * too slow, and we shouldn't wait for that... */
+        if (errno == EAGAIN)
+                return;
+
         if (ucred && errno == ESRCH) {
                 struct ucred u;
 
@@ -717,6 +770,9 @@ static void forward_syslog_iovec(Server *s, const struct iovec *iovec, unsigned
 
                 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
                         return;
+
+                if (errno == EAGAIN)
+                        return;
         }
 
         log_debug("Failed to forward syslog message: %m");
@@ -732,13 +788,13 @@ static void forward_syslog_raw(Server *s, const char *buffer, struct ucred *ucre
         forward_syslog_iovec(s, &iovec, 1, ucred, tv);
 }
 
-static void forward_syslog(Server *s, int priority, const char *tag, const char *message, struct ucred *ucred, struct timeval *tv) {
+static void forward_syslog(Server *s, int priority, const char *identifier, const char *message, struct ucred *ucred, struct timeval *tv) {
         struct iovec iovec[5];
         char header_priority[6], header_time[64], header_pid[16];
         int n = 0;
         time_t t;
         struct tm *tm;
-        char *tag_buf = NULL;
+        char *ident_buf = NULL;
 
         assert(s);
         assert(priority >= 0);
@@ -759,22 +815,22 @@ static void forward_syslog(Server *s, int priority, const char *tag, const char
                 return;
         IOVEC_SET_STRING(iovec[n++], header_time);
 
-        /* Third: tag and PID */
+        /* Third: identifier and PID */
         if (ucred) {
-                if (!tag) {
-                        get_process_comm(ucred->pid, &tag_buf);
-                        tag = tag_buf;
+                if (!identifier) {
+                        get_process_comm(ucred->pid, &ident_buf);
+                        identifier = ident_buf;
                 }
 
                 snprintf(header_pid, sizeof(header_pid), "[%lu]: ", (unsigned long) ucred->pid);
                 char_array_0(header_pid);
 
-                if (tag)
-                        IOVEC_SET_STRING(iovec[n++], tag);
+                if (identifier)
+                        IOVEC_SET_STRING(iovec[n++], identifier);
 
                 IOVEC_SET_STRING(iovec[n++], header_pid);
-        } else if (tag) {
-                IOVEC_SET_STRING(iovec[n++], tag);
+        } else if (identifier) {
+                IOVEC_SET_STRING(iovec[n++], identifier);
                 IOVEC_SET_STRING(iovec[n++], ": ");
         }
 
@@ -783,7 +839,7 @@ static void forward_syslog(Server *s, int priority, const char *tag, const char
 
         forward_syslog_iovec(s, iovec, n, ucred, tv);
 
-        free(tag_buf);
+        free(ident_buf);
 }
 
 static int fixup_priority(int priority) {
@@ -794,11 +850,11 @@ static int fixup_priority(int priority) {
         return priority;
 }
 
-static void forward_kmsg(Server *s, int priority, const char *tag, const char *message, struct ucred *ucred) {
+static void forward_kmsg(Server *s, int priority, const char *identifier, const char *message, struct ucred *ucred) {
         struct iovec iovec[5];
         char header_priority[6], header_pid[16];
         int n = 0;
-        char *tag_buf = NULL;
+        char *ident_buf = NULL;
         int fd;
 
         assert(s);
@@ -815,22 +871,22 @@ static void forward_kmsg(Server *s, int priority, const char *tag, const char *m
         char_array_0(header_priority);
         IOVEC_SET_STRING(iovec[n++], header_priority);
 
-        /* Second: tag and PID */
+        /* Second: identifier and PID */
         if (ucred) {
-                if (!tag) {
-                        get_process_comm(ucred->pid, &tag_buf);
-                        tag = tag_buf;
+                if (!identifier) {
+                        get_process_comm(ucred->pid, &ident_buf);
+                        identifier = ident_buf;
                 }
 
                 snprintf(header_pid, sizeof(header_pid), "[%lu]: ", (unsigned long) ucred->pid);
                 char_array_0(header_pid);
 
-                if (tag)
-                        IOVEC_SET_STRING(iovec[n++], tag);
+                if (identifier)
+                        IOVEC_SET_STRING(iovec[n++], identifier);
 
                 IOVEC_SET_STRING(iovec[n++], header_pid);
-        } else if (tag) {
-                IOVEC_SET_STRING(iovec[n++], tag);
+        } else if (identifier) {
+                IOVEC_SET_STRING(iovec[n++], identifier);
                 IOVEC_SET_STRING(iovec[n++], ": ");
         }
 
@@ -850,34 +906,34 @@ static void forward_kmsg(Server *s, int priority, const char *tag, const char *m
         close_nointr_nofail(fd);
 
 finish:
-        free(tag_buf);
+        free(ident_buf);
 }
 
-static void forward_console(Server *s, const char *tag, const char *message, struct ucred *ucred) {
+static void forward_console(Server *s, const char *identifier, const char *message, struct ucred *ucred) {
         struct iovec iovec[4];
         char header_pid[16];
         int n = 0, fd;
-        char *tag_buf = NULL;
+        char *ident_buf = NULL;
 
         assert(s);
         assert(message);
 
-        /* First: tag and PID */
+        /* First: identifier and PID */
         if (ucred) {
-                if (!tag) {
-                        get_process_comm(ucred->pid, &tag_buf);
-                        tag = tag_buf;
+                if (!identifier) {
+                        get_process_comm(ucred->pid, &ident_buf);
+                        identifier = ident_buf;
                 }
 
                 snprintf(header_pid, sizeof(header_pid), "[%lu]: ", (unsigned long) ucred->pid);
                 char_array_0(header_pid);
 
-                if (tag)
-                        IOVEC_SET_STRING(iovec[n++], tag);
+                if (identifier)
+                        IOVEC_SET_STRING(iovec[n++], identifier);
 
                 IOVEC_SET_STRING(iovec[n++], header_pid);
-        } else if (tag) {
-                IOVEC_SET_STRING(iovec[n++], tag);
+        } else if (identifier) {
+                IOVEC_SET_STRING(iovec[n++], identifier);
                 IOVEC_SET_STRING(iovec[n++], ": ");
         }
 
@@ -897,16 +953,17 @@ static void forward_console(Server *s, const char *tag, const char *message, str
         close_nointr_nofail(fd);
 
 finish:
-        free(tag_buf);
+        free(ident_buf);
 }
 
-static void read_tag(const char **buf, char **tag) {
+static void read_identifier(const char **buf, char **identifier, char **pid) {
         const char *p;
         char *t;
         size_t l, e;
 
         assert(buf);
-        assert(tag);
+        assert(identifier);
+        assert(pid);
 
         p = *buf;
 
@@ -926,6 +983,10 @@ static void read_tag(const char **buf, char **tag) {
                 for (;;) {
 
                         if (p[k] == '[') {
+                                t = strndup(p+k+1, l-k-2);
+                                if (t)
+                                        *pid = t;
+
                                 l = k;
                                 break;
                         }
@@ -939,18 +1000,18 @@ static void read_tag(const char **buf, char **tag) {
 
         t = strndup(p, l);
         if (t)
-                *tag = t;
+                *identifier = t;
 
         *buf = p + e;
         *buf += strspn(*buf, WHITESPACE);
 }
 
 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
-        char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL, *syslog_tag = NULL;
-        struct iovec iovec[N_IOVEC_META_FIELDS + 4];
+        char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL, *syslog_identifier = NULL, *syslog_pid = NULL;
+        struct iovec iovec[N_IOVEC_META_FIELDS + 6];
         unsigned n = 0;
         int priority = LOG_USER | LOG_INFO;
-        char *tag = NULL;
+        char *identifier = NULL, *pid = NULL;
 
         assert(s);
         assert(buf);
@@ -960,13 +1021,15 @@ static void process_syslog_message(Server *s, const char *buf, struct ucred *ucr
 
         parse_syslog_priority((char**) &buf, &priority);
         skip_syslog_date((char**) &buf);
-        read_tag(&buf, &tag);
+        read_identifier(&buf, &identifier, &pid);
 
         if (s->forward_to_kmsg)
-                forward_kmsg(s, priority, tag, buf, ucred);
+                forward_kmsg(s, priority, identifier, buf, ucred);
 
         if (s->forward_to_console)
-                forward_console(s, tag, buf, ucred);
+                forward_console(s, identifier, buf, ucred);
+
+        IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=syslog");
 
         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
@@ -975,10 +1038,16 @@ static void process_syslog_message(Server *s, const char *buf, struct ucred *ucr
                 if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
                         IOVEC_SET_STRING(iovec[n++], syslog_facility);
 
-        if (tag) {
-                syslog_tag = strappend("SYSLOG_TAG=", tag);
-                if (syslog_tag)
-                        IOVEC_SET_STRING(iovec[n++], syslog_tag);
+        if (identifier) {
+                syslog_identifier = strappend("SYSLOG_IDENTIFIER=", identifier);
+                if (syslog_identifier)
+                        IOVEC_SET_STRING(iovec[n++], syslog_identifier);
+        }
+
+        if (pid) {
+                syslog_pid = strappend("SYSLOG_PID=", pid);
+                if (syslog_pid)
+                        IOVEC_SET_STRING(iovec[n++], syslog_pid);
         }
 
         message = strappend("MESSAGE=", buf);
@@ -988,10 +1057,11 @@ static void process_syslog_message(Server *s, const char *buf, struct ucred *ucr
         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority);
 
         free(message);
-        free(tag);
+        free(identifier);
+        free(pid);
         free(syslog_priority);
         free(syslog_facility);
-        free(syslog_tag);
+        free(syslog_identifier);
 }
 
 static bool valid_user_field(const char *p, size_t l) {
@@ -1031,11 +1101,11 @@ static bool valid_user_field(const char *p, size_t l) {
 
 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
         struct iovec *iovec = NULL;
-        unsigned n = 0, m = 0, j;
+        unsigned n = 0, m = 0, j, tn = (unsigned) -1;
         const char *p;
         size_t remaining;
         int priority = LOG_INFO;
-        char *tag = NULL, *message = NULL;
+        char *identifier = NULL, *message = NULL;
 
         assert(s);
         assert(buffer || n == 0);
@@ -1079,7 +1149,7 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_
                         struct iovec *c;
                         unsigned u;
 
-                        u = MAX((n+N_IOVEC_META_FIELDS) * 2U, 4U);
+                        u = MAX((n+N_IOVEC_META_FIELDS+1) * 2U, 4U);
                         c = realloc(iovec, u * sizeof(struct iovec));
                         if (!c) {
                                 log_error("Out of memory");
@@ -1125,13 +1195,13 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_
                                         priority = (priority & LOG_PRIMASK) | (((p[16] - '0')*10 + (p[17] - '0')) << 3);
 
                                 else if (l >= 12 &&
-                                         memcmp(p, "SYSLOG_TAG=", 11) == 0) {
+                                         memcmp(p, "SYSLOG_IDENTIFIER=", 11) == 0) {
                                         char *t;
 
                                         t = strndup(p + 11, l - 11);
                                         if (t) {
-                                                free(tag);
-                                                tag = t;
+                                                free(identifier);
+                                                identifier = t;
                                         }
                                 } else if (l >= 8 &&
                                            memcmp(p, "MESSAGE=", 8) == 0) {
@@ -1188,31 +1258,42 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_
                 }
         }
 
+        if (n <= 0)
+                goto finish;
+
+        tn = n++;
+        IOVEC_SET_STRING(iovec[tn], "_TRANSPORT=journal");
+
         if (message) {
                 if (s->forward_to_syslog)
-                        forward_syslog(s, priority, tag, message, ucred, tv);
+                        forward_syslog(s, priority, identifier, message, ucred, tv);
 
                 if (s->forward_to_kmsg)
-                        forward_kmsg(s, priority, tag, message, ucred);
+                        forward_kmsg(s, priority, identifier, message, ucred);
 
                 if (s->forward_to_console)
-                        forward_console(s, tag, message, ucred);
+                        forward_console(s, identifier, message, ucred);
         }
 
         dispatch_message(s, iovec, n, m, ucred, tv, priority);
 
-        for (j = 0; j < n; j++)
+finish:
+        for (j = 0; j < n; j++)  {
+                if (j == tn)
+                        continue;
+
                 if (iovec[j].iov_base < buffer ||
                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
                         free(iovec[j].iov_base);
+        }
 
-        free(tag);
+        free(identifier);
         free(message);
 }
 
 static int stdout_stream_log(StdoutStream *s, const char *p) {
-        struct iovec iovec[N_IOVEC_META_FIELDS + 4];
-        char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL, *syslog_tag = NULL;
+        struct iovec iovec[N_IOVEC_META_FIELDS + 5];
+        char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL, *syslog_identifier = NULL;
         unsigned n = 0;
         int priority;
 
@@ -1221,17 +1302,19 @@ static int stdout_stream_log(StdoutStream *s, const char *p) {
 
         priority = s->priority;
 
-        if (s->priority_prefix)
+        if (s->level_prefix)
                 parse_syslog_priority((char**) &p, &priority);
 
         if (s->forward_to_syslog || s->server->forward_to_syslog)
-                forward_syslog(s->server, fixup_priority(priority), s->tag, p, &s->ucred, NULL);
+                forward_syslog(s->server, fixup_priority(priority), s->identifier, p, &s->ucred, NULL);
 
         if (s->forward_to_kmsg || s->server->forward_to_kmsg)
-                forward_kmsg(s->server, priority, s->tag, p, &s->ucred);
+                forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
 
         if (s->forward_to_console || s->server->forward_to_console)
-                forward_console(s->server, s->tag, p, &s->ucred);
+                forward_console(s->server, s->identifier, p, &s->ucred);
+
+        IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
 
         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
@@ -1240,10 +1323,10 @@ static int stdout_stream_log(StdoutStream *s, const char *p) {
                 if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
                         IOVEC_SET_STRING(iovec[n++], syslog_facility);
 
-        if (s->tag) {
-                syslog_tag = strappend("SYSLOG_TAG=", s->tag);
-                if (syslog_tag)
-                        IOVEC_SET_STRING(iovec[n++], syslog_tag);
+        if (s->identifier) {
+                syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
+                if (syslog_identifier)
+                        IOVEC_SET_STRING(iovec[n++], syslog_identifier);
         }
 
         message = strappend("MESSAGE=", p);
@@ -1255,7 +1338,7 @@ static int stdout_stream_log(StdoutStream *s, const char *p) {
         free(message);
         free(syslog_priority);
         free(syslog_facility);
-        free(syslog_tag);
+        free(syslog_identifier);
 
         return 0;
 }
@@ -1270,9 +1353,9 @@ static int stdout_stream_line(StdoutStream *s, char *p) {
 
         switch (s->state) {
 
-        case STDOUT_STREAM_TAG:
-                s->tag = strdup(p);
-                if (!s->tag) {
+        case STDOUT_STREAM_IDENTIFIER:
+                s->identifier = strdup(p);
+                if (!s->identifier) {
                         log_error("Out of memory");
                         return -ENOMEM;
                 }
@@ -1287,17 +1370,17 @@ static int stdout_stream_line(StdoutStream *s, char *p) {
                         return -EINVAL;
                 }
 
-                s->state = STDOUT_STREAM_PRIORITY_PREFIX;
+                s->state = STDOUT_STREAM_LEVEL_PREFIX;
                 return 0;
 
-        case STDOUT_STREAM_PRIORITY_PREFIX:
+        case STDOUT_STREAM_LEVEL_PREFIX:
                 r = parse_boolean(p);
                 if (r < 0) {
-                        log_warning("Failed to parse priority prefix line.");
+                        log_warning("Failed to parse level prefix line.");
                         return -EINVAL;
                 }
 
-                s->priority_prefix = !!r;
+                s->level_prefix = !!r;
                 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
                 return 0;
 
@@ -1359,7 +1442,7 @@ static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
                         skip = end - p + 1;
                 else if (remaining >= sizeof(s->buffer) - 1) {
                         end = p + sizeof(s->buffer) - 1;
-                        skip = sizeof(s->buffer) - 1;
+                        skip = remaining;
                 } else
                         break;
 
@@ -1440,7 +1523,7 @@ static void stdout_stream_free(StdoutStream *s) {
                 close_nointr_nofail(s->fd);
         }
 
-        free(s->tag);
+        free(s->identifier);
         free(s);
 }
 
@@ -1509,6 +1592,159 @@ fail:
         return r;
 }
 
+static int parse_kernel_timestamp(char **_p, usec_t *t) {
+        usec_t r;
+        int k, i;
+        char *p;
+
+        assert(_p);
+        assert(*_p);
+        assert(t);
+
+        p = *_p;
+
+        if (strlen(p) < 14 || p[0] != '[' || p[13] != ']' || p[6] != '.')
+                return 0;
+
+        r = 0;
+
+        for (i = 1; i <= 5; i++) {
+                r *= 10;
+
+                if (p[i] == ' ')
+                        continue;
+
+                k = undecchar(p[i]);
+                if (k < 0)
+                        return 0;
+
+                r += k;
+        }
+
+        for (i = 7; i <= 12; i++) {
+                r *= 10;
+
+                k = undecchar(p[i]);
+                if (k < 0)
+                        return 0;
+
+                r += k;
+        }
+
+        *t = r;
+        *_p += 14;
+        *_p += strspn(*_p, WHITESPACE);
+
+        return 1;
+}
+
+static void proc_kmsg_line(Server *s, const char *p) {
+        struct iovec iovec[N_IOVEC_META_FIELDS + 7];
+        char *message = NULL, *syslog_priority = NULL, *syslog_pid = NULL, *syslog_facility = NULL, *syslog_identifier = NULL, *source_time = NULL;
+        int priority = LOG_KERN | LOG_INFO;
+        unsigned n = 0;
+        usec_t usec;
+        char *identifier = NULL, *pid = NULL;
+
+        assert(s);
+        assert(p);
+
+        parse_syslog_priority((char **) &p, &priority);
+
+        if (s->forward_to_kmsg && (priority & LOG_FACMASK) != LOG_KERN)
+                return;
+
+        if (parse_kernel_timestamp((char **) &p, &usec) > 0) {
+                if (asprintf(&source_time, "_SOURCE_MONOTONIC_TIMESTAMP=%llu",
+                             (unsigned long long) usec) >= 0)
+                        IOVEC_SET_STRING(iovec[n++], source_time);
+        }
+
+        IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=kernel");
+
+        if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
+                IOVEC_SET_STRING(iovec[n++], syslog_priority);
+
+        if ((priority & LOG_FACMASK) == LOG_KERN) {
+
+                if (s->forward_to_syslog)
+                        forward_syslog(s, priority, "kernel", p, NULL, NULL);
+
+                IOVEC_SET_STRING(iovec[n++], "SYSLOG_IDENTIFIER=kernel");
+        } else {
+                read_identifier(&p, &identifier, &pid);
+
+                if (s->forward_to_syslog)
+                        forward_syslog(s, priority, identifier, p, NULL, NULL);
+
+                if (identifier) {
+                        syslog_identifier = strappend("SYSLOG_IDENTIFIER=", identifier);
+                        if (syslog_identifier)
+                                IOVEC_SET_STRING(iovec[n++], syslog_identifier);
+                }
+
+                if (pid) {
+                        syslog_pid = strappend("SYSLOG_PID=", pid);
+                        if (syslog_pid)
+                                IOVEC_SET_STRING(iovec[n++], syslog_pid);
+                }
+
+                if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
+                        IOVEC_SET_STRING(iovec[n++], syslog_facility);
+        }
+
+        message = strappend("MESSAGE=", p);
+        if (message)
+                IOVEC_SET_STRING(iovec[n++], message);
+
+
+        dispatch_message(s, iovec, n, ELEMENTSOF(iovec), NULL, NULL, priority);
+
+        free(message);
+        free(syslog_priority);
+        free(syslog_identifier);
+        free(syslog_pid);
+        free(syslog_facility);
+        free(source_time);
+        free(identifier);
+        free(pid);
+}
+
+static void proc_kmsg_scan(Server *s) {
+        char *p;
+        size_t remaining;
+
+        assert(s);
+
+        p = s->proc_kmsg_buffer;
+        remaining = s->proc_kmsg_length;
+        for (;;) {
+                char *end;
+                size_t skip;
+
+                end = memchr(p, '\n', remaining);
+                if (end)
+                        skip = end - p + 1;
+                else if (remaining >= sizeof(s->proc_kmsg_buffer) - 1) {
+                        end = p + sizeof(s->proc_kmsg_buffer) - 1;
+                        skip = remaining;
+                } else
+                        break;
+
+                *end = 0;
+
+                proc_kmsg_line(s, p);
+
+                remaining -= skip;
+                p += skip;
+        }
+
+        if (p > s->proc_kmsg_buffer) {
+                memmove(s->proc_kmsg_buffer, p, remaining);
+                s->proc_kmsg_length = remaining;
+        }
+}
+
 static int system_journal_open(Server *s) {
         int r;
         char *fn;
@@ -1544,7 +1780,7 @@ static int system_journal_open(Server *s) {
                         s->system_journal->metrics = s->system_metrics;
                         s->system_journal->compress = s->compress;
 
-                        fix_perms(s->system_journal, 0);
+                        server_fix_perms(s, s->system_journal, 0);
                 } else if (r < 0) {
 
                         if (r != -ENOENT && r != -EROFS)
@@ -1597,7 +1833,7 @@ static int system_journal_open(Server *s) {
                         s->runtime_journal->metrics = s->runtime_metrics;
                         s->runtime_journal->compress = s->compress;
 
-                        fix_perms(s->runtime_journal, 0);
+                        server_fix_perms(s, s->runtime_journal, 0);
                 }
         }
 
@@ -1628,6 +1864,8 @@ static int server_flush_to_var(Server *s) {
         if (!s->system_journal)
                 return 0;
 
+        log_info("Flushing to /var...");
+
         r = sd_id128_get_machine(&machine);
         if (r < 0) {
                 log_error("Failed to get machine id: %s", strerror(-r));
@@ -1683,6 +1921,49 @@ finish:
         return r;
 }
 
+static int server_read_proc_kmsg(Server *s) {
+        ssize_t l;
+        assert(s);
+        assert(s->proc_kmsg_fd >= 0);
+
+        l = read(s->proc_kmsg_fd, s->proc_kmsg_buffer + s->proc_kmsg_length, sizeof(s->proc_kmsg_buffer) - 1 - s->proc_kmsg_length);
+        if (l < 0) {
+
+                if (errno == EAGAIN || errno == EINTR)
+                        return 0;
+
+                log_error("Failed to read from kernel: %m");
+                return -errno;
+        }
+
+        s->proc_kmsg_length += l;
+
+        proc_kmsg_scan(s);
+        return 1;
+}
+
+static int server_flush_proc_kmsg(Server *s) {
+        int r;
+
+        assert(s);
+
+        if (s->proc_kmsg_fd < 0)
+                return 0;
+
+        log_info("Flushing /proc/kmsg...");
+
+        for (;;) {
+                r = server_read_proc_kmsg(s);
+                if (r < 0)
+                        return r;
+
+                if (r == 0)
+                        break;
+        }
+
+        return 0;
+}
+
 static int process_event(Server *s, struct epoll_event *ev) {
         assert(s);
 
@@ -1702,7 +1983,7 @@ static int process_event(Server *s, struct epoll_event *ev) {
                                 return -EIO;
 
                         if (errno == EINTR || errno == EAGAIN)
-                                return 0;
+                                return 1;
 
                         return -errno;
                 }
@@ -1715,6 +1996,20 @@ static int process_event(Server *s, struct epoll_event *ev) {
                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
                 return 0;
 
+        } else if (ev->data.fd == s->proc_kmsg_fd) {
+                int r;
+
+                if (ev->events != EPOLLIN) {
+                        log_info("Got invalid event from epoll.");
+                        return -EIO;
+                }
+
+                r = server_read_proc_kmsg(s);
+                if (r < 0)
+                        return r;
+
+                return 1;
+
         } else if (ev->data.fd == s->native_fd ||
                    ev->data.fd == s->syslog_fd) {
 
@@ -1857,7 +2152,7 @@ static int open_syslog_socket(Server *s) {
 
         if (s->syslog_fd < 0) {
 
-                s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
+                s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
                 if (s->syslog_fd < 0) {
                         log_error("socket() failed: %m");
                         return -errno;
@@ -1921,7 +2216,7 @@ static int open_native_socket(Server*s) {
 
         if (s->native_fd < 0) {
 
-                s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
+                s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
                 if (s->native_fd < 0) {
                         log_error("socket() failed: %m");
                         return -errno;
@@ -1929,7 +2224,7 @@ static int open_native_socket(Server*s) {
 
                 zero(sa);
                 sa.un.sun_family = AF_UNIX;
-                strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
+                strncpy(sa.un.sun_path, "/run/systemd/journal/socket", sizeof(sa.un.sun_path));
 
                 unlink(sa.un.sun_path);
 
@@ -1976,7 +2271,7 @@ static int open_stdout_socket(Server *s) {
 
         if (s->stdout_fd < 0) {
 
-                s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
+                s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
                 if (s->stdout_fd < 0) {
                         log_error("socket() failed: %m");
                         return -errno;
@@ -1984,7 +2279,7 @@ static int open_stdout_socket(Server *s) {
 
                 zero(sa);
                 sa.un.sun_family = AF_UNIX;
-                strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
+                strncpy(sa.un.sun_path, "/run/systemd/journal/stdout", sizeof(sa.un.sun_path));
 
                 unlink(sa.un.sun_path);
 
@@ -2013,6 +2308,32 @@ static int open_stdout_socket(Server *s) {
         return 0;
 }
 
+static int open_proc_kmsg(Server *s) {
+        struct epoll_event ev;
+
+        assert(s);
+
+        if (!s->import_proc_kmsg)
+                return 0;
+
+
+        s->proc_kmsg_fd = open("/proc/kmsg", O_CLOEXEC|O_NONBLOCK|O_NOCTTY);
+        if (s->proc_kmsg_fd < 0) {
+                log_warning("Failed to open /proc/kmsg, ignoring: %m");
+                return 0;
+        }
+
+        zero(ev);
+        ev.events = EPOLLIN;
+        ev.data.fd = s->proc_kmsg_fd;
+        if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->proc_kmsg_fd, &ev) < 0) {
+                log_error("Failed to add /proc/kmsg fd to epoll object: %m");
+                return -errno;
+        }
+
+        return 0;
+}
+
 static int open_signalfd(Server *s) {
         sigset_t mask;
         struct epoll_event ev;
@@ -2041,6 +2362,59 @@ static int open_signalfd(Server *s) {
         return 0;
 }
 
+static int server_parse_proc_cmdline(Server *s) {
+        char *line, *w, *state;
+        int r;
+        size_t l;
+
+        if (detect_container(NULL) > 0)
+                return 0;
+
+        r = read_one_line_file("/proc/cmdline", &line);
+        if (r < 0) {
+                log_warning("Failed to read /proc/cmdline, ignoring: %s", strerror(-r));
+                return 0;
+        }
+
+        FOREACH_WORD_QUOTED(w, l, line, state) {
+                char *word;
+
+                word = strndup(w, l);
+                if (!word) {
+                        r = -ENOMEM;
+                        goto finish;
+                }
+
+                if (startswith(word, "systemd_journald.forward_to_syslog=")) {
+                        r = parse_boolean(word + 35);
+                        if (r < 0)
+                                log_warning("Failed to parse forward to syslog switch %s. Ignoring.", word + 35);
+                        else
+                                s->forward_to_syslog = r;
+                } else if (startswith(word, "systemd_journald.forward_to_kmsg=")) {
+                        r = parse_boolean(word + 33);
+                        if (r < 0)
+                                log_warning("Failed to parse forward to kmsg switch %s. Ignoring.", word + 33);
+                        else
+                                s->forward_to_kmsg = r;
+                } else if (startswith(word, "systemd_journald.forward_to_console=")) {
+                        r = parse_boolean(word + 36);
+                        if (r < 0)
+                                log_warning("Failed to parse forward to console switch %s. Ignoring.", word + 36);
+                        else
+                                s->forward_to_console = r;
+                }
+
+                free(word);
+        }
+
+        r = 0;
+
+finish:
+        free(line);
+        return r;
+}
+
 static int server_parse_config_file(Server *s) {
         FILE *f;
         const char *fn;
@@ -2073,18 +2447,20 @@ static int server_init(Server *s) {
         assert(s);
 
         zero(*s);
-        s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
+        s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = s->proc_kmsg_fd = -1;
         s->compress = true;
 
         s->rate_limit_interval = DEFAULT_RATE_LIMIT_INTERVAL;
         s->rate_limit_burst = DEFAULT_RATE_LIMIT_BURST;
 
         s->forward_to_syslog = true;
+        s->import_proc_kmsg = true;
 
         memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics));
         memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics));
 
         server_parse_config_file(s);
+        server_parse_proc_cmdline(s);
 
         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
         if (!s->user_journals) {
@@ -2106,7 +2482,7 @@ static int server_init(Server *s) {
 
         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
 
-                if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
+                if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/journal/socket", 0) > 0) {
 
                         if (s->native_fd >= 0) {
                                 log_error("Too many native sockets passed.");
@@ -2115,7 +2491,7 @@ static int server_init(Server *s) {
 
                         s->native_fd = fd;
 
-                } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
+                } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/journal/stdout", 0) > 0) {
 
                         if (s->stdout_fd >= 0) {
                                 log_error("Too many stdout sockets passed.");
@@ -2151,6 +2527,10 @@ static int server_init(Server *s) {
         if (r < 0)
                 return r;
 
+        r = open_proc_kmsg(s);
+        if (r < 0)
+                return r;
+
         r = system_journal_open(s);
         if (r < 0)
                 return r;
@@ -2199,6 +2579,9 @@ static void server_done(Server *s) {
         if (s->stdout_fd >= 0)
                 close_nointr_nofail(s->stdout_fd);
 
+        if (s->proc_kmsg_fd >= 0)
+                close_nointr_nofail(s->proc_kmsg_fd);
+
         if (s->rate_limit)
                 journal_rate_limit_free(s->rate_limit);
 
@@ -2231,6 +2614,7 @@ int main(int argc, char *argv[]) {
 
         server_vacuum(&server);
         server_flush_to_var(&server);
+        server_flush_proc_kmsg(&server);
 
         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
         driver_message(&server, SD_MESSAGE_JOURNAL_START, "Journal started");