+
+ path = shortened_cgroup_path(ucred->pid);
+ if (path) {
+ cgroup = strappend("_SYSTEMD_CGROUP=", path);
+ if (cgroup)
+ IOVEC_SET_STRING(iovec[n++], cgroup);
+
+ free(path);
+ }
+ }
+
+ if (tv) {
+ if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
+ (unsigned long long) timeval_load(tv)) >= 0)
+ IOVEC_SET_STRING(iovec[n++], source_time);
+ }
+
+ /* Note that strictly speaking storing the boot id here is
+ * redundant since the entry includes this in-line
+ * anyway. However, we need this indexed, too. */
+ r = sd_id128_get_boot(&id);
+ if (r >= 0)
+ if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
+ IOVEC_SET_STRING(iovec[n++], boot_id);
+
+ r = sd_id128_get_machine(&id);
+ if (r >= 0)
+ if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
+ IOVEC_SET_STRING(iovec[n++], machine_id);
+
+ t = gethostname_malloc();
+ if (t) {
+ hostname = strappend("_HOSTNAME=", t);
+ if (hostname)
+ IOVEC_SET_STRING(iovec[n++], hostname);
+ free(t);
+ }
+
+ assert(n <= m);
+
+retry:
+ f = find_journal(s, realuid == 0 ? 0 : loginuid);
+ if (!f)
+ log_warning("Dropping message, as we can't find a place to store the data.");
+ else {
+ r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
+
+ if (r == -E2BIG && !vacuumed) {
+ log_info("Allocation limit reached.");
+
+ server_vacuum(s);
+ vacuumed = true;
+
+ log_info("Retrying write.");
+ goto retry;
+ }
+
+ if (r < 0)
+ log_error("Failed to write entry, ignoring: %s", strerror(-r));
+ }
+
+ free(pid);
+ free(uid);
+ free(gid);
+ free(comm);
+ free(exe);
+ free(cmdline);
+ free(source_time);
+ free(boot_id);
+ free(machine_id);
+ free(hostname);
+ free(audit_session);
+ free(audit_loginuid);
+ free(cgroup);
+}
+
+static void dispatch_message(Server *s,
+ struct iovec *iovec, unsigned n, unsigned m,
+ struct ucred *ucred,
+ struct timeval *tv,
+ int priority) {
+ int rl;
+ char *path, *c;
+
+ assert(s);
+ assert(iovec || n == 0);
+
+ if (n == 0)
+ return;
+
+ if (!ucred)
+ goto finish;
+
+ path = shortened_cgroup_path(ucred->pid);
+ if (!path)
+ goto finish;
+
+ /* example: /user/lennart/3/foobar
+ * /system/dbus.service/foobar
+ *
+ * So let's cut of everything past the third /, since that is
+ * wher user directories start */
+
+ c = strchr(path, '/');
+ if (c) {
+ c = strchr(c+1, '/');
+ if (c) {
+ c = strchr(c+1, '/');
+ if (c)
+ *c = 0;
+ }
+ }
+
+ rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
+
+ if (rl == 0) {
+ free(path);
+ return;
+ }
+
+ if (rl > 1) {
+ int j = 0;
+ char suppress_message[LINE_MAX];
+ struct iovec suppress_iovec[15];
+
+ /* Write a suppression message if we suppressed something */
+
+ snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
+ char_array_0(suppress_message);
+
+ IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
+ IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
+
+ dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
+ }
+
+ free(path);
+
+finish:
+ dispatch_message_real(s, iovec, n, m, ucred, tv);
+}
+
+static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
+ char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
+ struct iovec iovec[16];
+ unsigned n = 0;
+ int priority = LOG_USER | LOG_INFO;
+
+ assert(s);
+ assert(buf);
+
+ parse_syslog_priority((char**) &buf, &priority);
+ skip_syslog_date((char**) &buf);
+
+ if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
+ IOVEC_SET_STRING(iovec[n++], syslog_priority);
+
+ if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
+ IOVEC_SET_STRING(iovec[n++], syslog_facility);
+
+ message = strappend("MESSAGE=", buf);
+ if (message)
+ IOVEC_SET_STRING(iovec[n++], message);
+
+ dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
+
+ free(message);
+ free(syslog_facility);
+ free(syslog_priority);
+}
+
+static bool valid_user_field(const char *p, size_t l) {
+ const char *a;
+
+ /* We kinda enforce POSIX syntax recommendations for
+ environment variables here, but make a couple of additional
+ requirements.
+
+ http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
+
+ /* No empty field names */
+ if (l <= 0)
+ return false;
+
+ /* Don't allow names longer than 64 chars */
+ if (l > 64)
+ return false;
+
+ /* Variables starting with an underscore are protected */
+ if (p[0] == '_')
+ return false;
+
+ /* Don't allow digits as first character */
+ if (p[0] >= '0' && p[0] <= '9')
+ return false;
+
+ /* Only allow A-Z0-9 and '_' */
+ for (a = p; a < p + l; a++)
+ if (!((*a >= 'A' && *a <= 'Z') ||
+ (*a >= '0' && *a <= '9') ||
+ *a == '_'))
+ return false;
+
+ return true;
+}
+
+static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
+ struct iovec *iovec = NULL;
+ unsigned n = 0, m = 0, j;
+ const char *p;
+ size_t remaining;
+ int priority = LOG_INFO;
+
+ assert(s);
+ assert(buffer || n == 0);
+
+ p = buffer;
+ remaining = buffer_size;
+
+ while (remaining > 0) {
+ const char *e, *q;
+
+ e = memchr(p, '\n', remaining);
+
+ if (!e) {
+ /* Trailing noise, let's ignore it, and flush what we collected */
+ log_debug("Received message with trailing noise, ignoring.");
+ break;
+ }
+
+ if (e == p) {
+ /* Entry separator */
+ dispatch_message(s, iovec, n, m, ucred, tv, priority);
+ n = 0;
+ priority = LOG_INFO;
+
+ p++;
+ remaining--;
+ continue;
+ }
+
+ if (*p == '.' || *p == '#') {
+ /* Ignore control commands for now, and
+ * comments too. */
+ remaining -= (e - p) + 1;
+ p = e + 1;
+ continue;
+ }
+
+ /* A property follows */
+
+ if (n+13 >= m) {
+ struct iovec *c;
+ unsigned u;
+
+ u = MAX((n+13U) * 2U, 4U);
+ c = realloc(iovec, u * sizeof(struct iovec));
+ if (!c) {
+ log_error("Out of memory");
+ break;
+ }
+
+ iovec = c;
+ m = u;
+ }
+
+ q = memchr(p, '=', e - p);
+ if (q) {
+ if (valid_user_field(p, q - p)) {
+ /* If the field name starts with an
+ * underscore, skip the variable,
+ * since that indidates a trusted
+ * field */
+ iovec[n].iov_base = (char*) p;
+ iovec[n].iov_len = e - p;
+ n++;
+
+ /* We need to determine the priority
+ * of this entry for the rate limiting
+ * logic */
+ if (e - p == 10 &&
+ memcmp(p, "PRIORITY=", 10) == 0 &&
+ p[10] >= '0' &&
+ p[10] <= '9')
+ priority = p[10] - '0';
+ }
+
+ remaining -= (e - p) + 1;
+ p = e + 1;
+ continue;
+ } else {
+ uint64_t l;
+ char *k;
+
+ if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
+ log_debug("Failed to parse message, ignoring.");
+ break;
+ }
+
+ memcpy(&l, e + 1, sizeof(uint64_t));
+ l = le64toh(l);
+
+ if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
+ e[1+sizeof(uint64_t)+l] != '\n') {
+ log_debug("Failed to parse message, ignoring.");
+ break;
+ }
+
+ k = malloc((e - p) + 1 + l);
+ if (!k) {
+ log_error("Out of memory");
+ break;
+ }
+
+ memcpy(k, p, e - p);
+ k[e - p] = '=';
+ memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
+
+ if (valid_user_field(p, e - p)) {
+ iovec[n].iov_base = k;
+ iovec[n].iov_len = (e - p) + 1 + l;
+ n++;
+ } else
+ free(k);
+
+ remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
+ p = e + 1 + sizeof(uint64_t) + l + 1;
+ }
+ }
+
+ dispatch_message(s, iovec, n, m, ucred, tv, priority);
+
+ for (j = 0; j < n; j++)
+ if (iovec[j].iov_base < buffer ||
+ (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
+ free(iovec[j].iov_base);
+}
+
+static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
+ struct iovec iovec[15];
+ char *message = NULL, *syslog_priority = NULL;
+ unsigned n = 0;
+ size_t tag_len;
+ int priority;
+
+ assert(s);
+ assert(p);
+
+ priority = s->priority;
+
+ if (s->priority_prefix &&
+ l > 3 &&
+ p[0] == '<' &&
+ p[1] >= '0' && p[1] <= '7' &&
+ p[2] == '>') {
+
+ priority = p[1] - '0';
+ p += 3;
+ l -= 3;
+ }
+
+ if (l <= 0)
+ return 0;
+
+ if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
+ IOVEC_SET_STRING(iovec[n++], syslog_priority);
+
+ tag_len = s->tag ? strlen(s->tag) + 2: 0;
+ message = malloc(8 + tag_len + l);
+ if (message) {
+ memcpy(message, "MESSAGE=", 8);
+
+ if (s->tag) {
+ memcpy(message+8, s->tag, tag_len-2);
+ memcpy(message+8+tag_len-2, ": ", 2);
+ }
+
+ memcpy(message+8+tag_len, p, l);
+ iovec[n].iov_base = message;
+ iovec[n].iov_len = 8+tag_len+l;
+ n++;
+ }
+
+ dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
+
+ if (s->tee_console) {
+ int console;
+
+ console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
+ if (console >= 0) {
+ n = 0;
+ if (s->tag) {
+ IOVEC_SET_STRING(iovec[n++], s->tag);
+ IOVEC_SET_STRING(iovec[n++], ": ");
+ }
+
+ iovec[n].iov_base = (void*) p;
+ iovec[n].iov_len = l;
+ n++;
+
+ IOVEC_SET_STRING(iovec[n++], (char*) "\n");
+
+ writev(console, iovec, n);
+ }
+ }
+
+ free(message);
+ free(syslog_priority);
+
+ return 0;
+}
+
+static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
+ assert(s);
+ assert(p);
+
+ while (l > 0 && strchr(WHITESPACE, *p)) {
+ l--;
+ p++;
+ }
+
+ while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
+ l--;
+
+ switch (s->state) {
+
+ case STDOUT_STREAM_TAG:
+
+ if (l > 0) {
+ s->tag = strndup(p, l);
+ if (!s->tag) {
+ log_error("Out of memory");
+ return -EINVAL;
+ }
+ }
+
+ s->state = STDOUT_STREAM_PRIORITY;
+ return 0;
+
+ case STDOUT_STREAM_PRIORITY:
+ if (l != 1 || *p < '0' || *p > '7') {
+ log_warning("Failed to parse log priority line.");
+ return -EINVAL;
+ }
+
+ s->priority = *p - '0';
+ s->state = STDOUT_STREAM_PRIORITY_PREFIX;
+ return 0;
+
+ case STDOUT_STREAM_PRIORITY_PREFIX:
+ if (l != 1 || *p < '0' || *p > '1') {
+ log_warning("Failed to parse priority prefix line.");
+ return -EINVAL;
+ }
+
+ s->priority_prefix = *p - '0';
+ s->state = STDOUT_STREAM_TEE_CONSOLE;
+ return 0;
+
+ case STDOUT_STREAM_TEE_CONSOLE:
+ if (l != 1 || *p < '0' || *p > '1') {
+ log_warning("Failed to parse tee to console line.");
+ return -EINVAL;
+ }
+
+ s->tee_console = *p - '0';
+ s->state = STDOUT_STREAM_RUNNING;
+ return 0;
+
+ case STDOUT_STREAM_RUNNING:
+ return stdout_stream_log(s, p, l);
+ }
+
+ assert_not_reached("Unknown stream state");
+}
+
+static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
+ char *p;
+ size_t remaining;
+ int r;
+
+ assert(s);
+
+ p = s->buffer;
+ remaining = s->length;
+ for (;;) {
+ char *end;
+ size_t skip;
+
+ end = memchr(p, '\n', remaining);
+ if (!end) {
+ if (remaining >= LINE_MAX) {
+ end = p + LINE_MAX;
+ skip = LINE_MAX;
+ } else
+ break;
+ } else
+ skip = end - p + 1;
+
+ r = stdout_stream_line(s, p, end - p);
+ if (r < 0)
+ return r;
+
+ remaining -= skip;
+ p += skip;
+ }
+
+ if (force_flush && remaining > 0) {
+ r = stdout_stream_line(s, p, remaining);
+ if (r < 0)
+ return r;
+
+ p += remaining;
+ remaining = 0;
+ }
+
+ if (p > s->buffer) {
+ memmove(s->buffer, p, remaining);
+ s->length = remaining;
+ }
+
+ return 0;
+}
+
+static int stdout_stream_process(StdoutStream *s) {
+ ssize_t l;
+ int r;
+
+ assert(s);
+
+ l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
+ if (l < 0) {
+
+ if (errno == EAGAIN)
+ return 0;
+
+ log_warning("Failed to read from stream: %m");
+ return -errno;
+ }
+
+ if (l == 0) {
+ r = stdout_stream_scan(s, true);
+ if (r < 0)
+ return r;
+
+ return 0;
+ }
+
+ s->length += l;
+ r = stdout_stream_scan(s, false);
+ if (r < 0)
+ return r;
+
+ return 1;
+
+}
+
+static void stdout_stream_free(StdoutStream *s) {
+ assert(s);
+
+ if (s->server) {
+ assert(s->server->n_stdout_streams > 0);
+ s->server->n_stdout_streams --;
+ LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
+ }
+
+ if (s->fd >= 0) {
+ if (s->server)
+ epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
+
+ close_nointr_nofail(s->fd);
+ }
+
+ free(s->tag);
+ free(s);
+}
+
+static int stdout_stream_new(Server *s) {
+ StdoutStream *stream;
+ int fd, r;
+ socklen_t len;
+ struct epoll_event ev;
+
+ assert(s);
+
+ fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
+ if (fd < 0) {
+ if (errno == EAGAIN)
+ return 0;
+
+ log_error("Failed to accept stdout connection: %m");
+ return -errno;
+ }
+
+ if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
+ log_warning("Too many stdout streams, refusing connection.");
+ close_nointr_nofail(fd);
+ return 0;
+ }
+
+ stream = new0(StdoutStream, 1);
+ if (!stream) {
+ log_error("Out of memory.");
+ close_nointr_nofail(fd);
+ return -ENOMEM;