+ return 0;
+}
+
+static int stdout_stream_process(StdoutStream *s) {
+ ssize_t l;
+ int r;
+
+ assert(s);
+
+ l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
+ if (l < 0) {
+
+ if (errno == EAGAIN)
+ return 0;
+
+ log_warning("Failed to read from stream: %m");
+ return -errno;
+ }
+
+ if (l == 0) {
+ r = stdout_stream_scan(s, true);
+ if (r < 0)
+ return r;
+
+ return 0;
+ }
+
+ s->length += l;
+ r = stdout_stream_scan(s, false);
+ if (r < 0)
+ return r;
+
+ return 1;
+
+}
+
+static void stdout_stream_free(StdoutStream *s) {
+ assert(s);
+
+ if (s->server) {
+ assert(s->server->n_stdout_streams > 0);
+ s->server->n_stdout_streams --;
+ LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
+ }
+
+ if (s->fd >= 0) {
+ if (s->server)
+ epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
+
+ close_nointr_nofail(s->fd);
+ }
+
+#ifdef HAVE_SELINUX
+ if (s->security_context)
+ freecon(s->security_context);
+#endif
+
+ free(s->identifier);
+ free(s);
+}
+
+static int stdout_stream_new(Server *s) {
+ StdoutStream *stream;
+ int fd, r;
+ socklen_t len;
+ struct epoll_event ev;
+
+ assert(s);
+
+ fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
+ if (fd < 0) {
+ if (errno == EAGAIN)
+ return 0;
+
+ log_error("Failed to accept stdout connection: %m");
+ return -errno;
+ }
+
+ if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
+ log_warning("Too many stdout streams, refusing connection.");
+ close_nointr_nofail(fd);
+ return 0;
+ }
+
+ stream = new0(StdoutStream, 1);
+ if (!stream) {
+ log_error("Out of memory.");
+ close_nointr_nofail(fd);
+ return -ENOMEM;
+ }
+
+ stream->fd = fd;
+
+ len = sizeof(stream->ucred);
+ if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
+ log_error("Failed to determine peer credentials: %m");
+ r = -errno;
+ goto fail;
+ }
+
+#ifdef HAVE_SELINUX
+ if (getpeercon(fd, &stream->security_context) < 0 && errno != ENOPROTOOPT)
+ log_error("Failed to determine peer security context: %m");
+#endif
+
+ if (shutdown(fd, SHUT_WR) < 0) {
+ log_error("Failed to shutdown writing side of socket: %m");
+ r = -errno;
+ goto fail;
+ }
+
+ zero(ev);
+ ev.data.ptr = stream;
+ ev.events = EPOLLIN;
+ if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
+ log_error("Failed to add stream to event loop: %m");
+ r = -errno;
+ goto fail;
+ }
+
+ stream->server = s;
+ LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
+ s->n_stdout_streams ++;
+
+ return 0;
+
+fail:
+ stdout_stream_free(stream);
+ return r;
+}
+
+static int parse_kernel_timestamp(char **_p, usec_t *t) {
+ usec_t r;
+ int k, i;
+ char *p;
+
+ assert(_p);
+ assert(*_p);
+ assert(t);
+
+ p = *_p;
+
+ if (strlen(p) < 14 || p[0] != '[' || p[13] != ']' || p[6] != '.')
+ return 0;
+
+ r = 0;
+
+ for (i = 1; i <= 5; i++) {
+ r *= 10;
+
+ if (p[i] == ' ')
+ continue;
+
+ k = undecchar(p[i]);
+ if (k < 0)
+ return 0;
+
+ r += k;
+ }
+
+ for (i = 7; i <= 12; i++) {
+ r *= 10;
+
+ k = undecchar(p[i]);
+ if (k < 0)
+ return 0;
+
+ r += k;
+ }
+
+ *t = r;
+ *_p += 14;
+ *_p += strspn(*_p, WHITESPACE);
+
+ return 1;
+}
+
+static bool is_us(const char *pid) {
+ pid_t t;
+
+ assert(pid);
+
+ if (parse_pid(pid, &t) < 0)
+ return false;
+
+ return t == getpid();
+}
+
+static void proc_kmsg_line(Server *s, const char *p) {
+ struct iovec iovec[N_IOVEC_META_FIELDS + 7];
+ char *message = NULL, *syslog_priority = NULL, *syslog_pid = NULL, *syslog_facility = NULL, *syslog_identifier = NULL, *source_time = NULL;
+ int priority = LOG_KERN | LOG_INFO;
+ unsigned n = 0;
+ usec_t usec;
+ char *identifier = NULL, *pid = NULL;
+
+ assert(s);
+ assert(p);
+
+ if (isempty(p))
+ return;
+
+ parse_syslog_priority((char **) &p, &priority);
+
+ if (s->forward_to_kmsg && (priority & LOG_FACMASK) != LOG_KERN)
+ return;
+
+ if (parse_kernel_timestamp((char **) &p, &usec) > 0) {
+ if (asprintf(&source_time, "_SOURCE_MONOTONIC_TIMESTAMP=%llu",
+ (unsigned long long) usec) >= 0)
+ IOVEC_SET_STRING(iovec[n++], source_time);
+ }
+
+ IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=kernel");
+
+ if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
+ IOVEC_SET_STRING(iovec[n++], syslog_priority);
+
+ if ((priority & LOG_FACMASK) == LOG_KERN) {
+
+ if (s->forward_to_syslog)
+ forward_syslog(s, priority, "kernel", p, NULL, NULL);
+
+ IOVEC_SET_STRING(iovec[n++], "SYSLOG_IDENTIFIER=kernel");
+ } else {
+ read_identifier(&p, &identifier, &pid);
+
+ /* Avoid any messages we generated ourselves via
+ * log_info() and friends. */
+ if (pid && is_us(pid))
+ goto finish;
+
+ if (s->forward_to_syslog)
+ forward_syslog(s, priority, identifier, p, NULL, NULL);
+
+ if (identifier) {
+ syslog_identifier = strappend("SYSLOG_IDENTIFIER=", identifier);
+ if (syslog_identifier)
+ IOVEC_SET_STRING(iovec[n++], syslog_identifier);
+ }
+
+ if (pid) {
+ syslog_pid = strappend("SYSLOG_PID=", pid);
+ if (syslog_pid)
+ IOVEC_SET_STRING(iovec[n++], syslog_pid);
+ }
+
+ if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
+ IOVEC_SET_STRING(iovec[n++], syslog_facility);
+ }
+
+ message = strappend("MESSAGE=", p);
+ if (message)
+ IOVEC_SET_STRING(iovec[n++], message);
+
+ dispatch_message(s, iovec, n, ELEMENTSOF(iovec), NULL, NULL, NULL, 0, NULL, priority);
+
+finish:
+ free(message);
+ free(syslog_priority);
+ free(syslog_identifier);
+ free(syslog_pid);
+ free(syslog_facility);
+ free(source_time);
+ free(identifier);
+ free(pid);
+}
+
+static void proc_kmsg_scan(Server *s) {
+ char *p;
+ size_t remaining;
+
+ assert(s);
+
+ p = s->proc_kmsg_buffer;
+ remaining = s->proc_kmsg_length;
+ for (;;) {
+ char *end;
+ size_t skip;
+
+ end = memchr(p, '\n', remaining);
+ if (end)
+ skip = end - p + 1;
+ else if (remaining >= sizeof(s->proc_kmsg_buffer) - 1) {
+ end = p + sizeof(s->proc_kmsg_buffer) - 1;
+ skip = remaining;
+ } else
+ break;
+
+ *end = 0;
+
+ proc_kmsg_line(s, p);
+
+ remaining -= skip;
+ p += skip;
+ }
+
+ if (p > s->proc_kmsg_buffer) {
+ memmove(s->proc_kmsg_buffer, p, remaining);
+ s->proc_kmsg_length = remaining;
+ }
+}
+
+static int system_journal_open(Server *s) {
+ int r;
+ char *fn;
+ sd_id128_t machine;
+ char ids[33];
+
+ r = sd_id128_get_machine(&machine);
+ if (r < 0)
+ return r;
+
+ sd_id128_to_string(machine, ids);
+
+ if (!s->system_journal &&
+ (s->storage == STORAGE_PERSISTENT || s->storage == STORAGE_AUTO) &&
+ access("/run/systemd/journal/flushed", F_OK) >= 0) {
+
+ /* If in auto mode: first try to create the machine
+ * path, but not the prefix.
+ *
+ * If in persistent mode: create /var/log/journal and
+ * the machine path */
+
+ if (s->storage == STORAGE_PERSISTENT)
+ (void) mkdir("/var/log/journal/", 0755);
+
+ fn = strappend("/var/log/journal/", ids);
+ if (!fn)
+ return -ENOMEM;
+
+ (void) mkdir(fn, 0755);
+ free(fn);
+
+ fn = strjoin("/var/log/journal/", ids, "/system.journal", NULL);
+ if (!fn)
+ return -ENOMEM;
+
+ r = journal_file_open_reliably(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
+ free(fn);
+
+ if (r >= 0) {
+ journal_default_metrics(&s->system_metrics, s->system_journal->fd);
+
+ s->system_journal->metrics = s->system_metrics;
+ s->system_journal->compress = s->compress;
+
+ server_fix_perms(s, s->system_journal, 0);
+ } else if (r < 0) {
+
+ if (r != -ENOENT && r != -EROFS)
+ log_warning("Failed to open system journal: %s", strerror(-r));
+
+ r = 0;
+ }
+ }
+
+ if (!s->runtime_journal &&
+ (s->storage != STORAGE_NONE)) {
+
+ fn = strjoin("/run/log/journal/", ids, "/system.journal", NULL);
+ if (!fn)
+ return -ENOMEM;
+
+ if (s->system_journal) {
+
+ /* Try to open the runtime journal, but only
+ * if it already exists, so that we can flush
+ * it into the system journal */
+
+ r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
+ free(fn);
+
+ if (r < 0) {
+ if (r != -ENOENT)
+ log_warning("Failed to open runtime journal: %s", strerror(-r));
+
+ r = 0;
+ }
+
+ } else {
+
+ /* OK, we really need the runtime journal, so create
+ * it if necessary. */
+
+ (void) mkdir_parents(fn, 0755);
+ r = journal_file_open_reliably(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
+ free(fn);
+
+ if (r < 0) {
+ log_error("Failed to open runtime journal: %s", strerror(-r));
+ return r;
+ }
+ }
+
+ if (s->runtime_journal) {
+ journal_default_metrics(&s->runtime_metrics, s->runtime_journal->fd);
+
+ s->runtime_journal->metrics = s->runtime_metrics;
+ s->runtime_journal->compress = s->compress;
+
+ server_fix_perms(s, s->runtime_journal, 0);
+ }
+ }
+
+ return r;