X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?p=elogind.git;a=blobdiff_plain;f=src%2Fjournal%2Fjournald.c;h=1efe0420dbdb869b35a12841c3f6d95023c333a2;hp=d35e1c119ab5984604471f1ed08b240ecadc93c7;hb=cf244689e9d1ab50082c9ddd0f3c4d1eb982badc;hpb=6ad1d1c30621280bfad3e63fcc1c7ceb7d8ffa98 diff --git a/src/journal/journald.c b/src/journal/journald.c index d35e1c119..1efe0420d 100644 --- a/src/journal/journald.c +++ b/src/journal/journald.c @@ -30,6 +30,7 @@ #include #include #include +#include #include "hashmap.h" #include "journal-file.h" @@ -37,14 +38,27 @@ #include "socket-util.h" #include "acl-util.h" #include "cgroup-util.h" +#include "list.h" +#include "journal-rate-limit.h" +#include "sd-journal.h" +#include "journal-internal.h" #define USER_JOURNALS_MAX 1024 +#define STDOUT_STREAMS_MAX 4096 + +#define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC) +#define DEFAULT_RATE_LIMIT_BURST 200 + +#define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC) + +typedef struct StdoutStream StdoutStream; typedef struct Server { int epoll_fd; int signal_fd; int syslog_fd; int native_fd; + int stdout_fd; JournalFile *runtime_journal; JournalFile *system_journal; @@ -55,11 +69,125 @@ typedef struct Server { char *buffer; size_t buffer_size; + JournalRateLimit *rate_limit; + JournalMetrics metrics; uint64_t max_use; bool compress; + + uint64_t cached_available_space; + usec_t cached_available_space_timestamp; + + LIST_HEAD(StdoutStream, stdout_streams); + unsigned n_stdout_streams; } Server; +typedef enum StdoutStreamState { + STDOUT_STREAM_TAG, + STDOUT_STREAM_PRIORITY, + STDOUT_STREAM_PRIORITY_PREFIX, + STDOUT_STREAM_TEE_CONSOLE, + STDOUT_STREAM_RUNNING +} StdoutStreamState; + +struct StdoutStream { + Server *server; + StdoutStreamState state; + + int fd; + + struct ucred ucred; + + char *tag; + int priority; + bool priority_prefix:1; + bool tee_console:1; + + char buffer[LINE_MAX+1]; + size_t length; + + LIST_FIELDS(StdoutStream, stdout_stream); +}; + +static int server_flush_to_var(Server *s); + +static uint64_t available_space(Server *s) { + char ids[33]; + sd_id128_t machine; + char *p; + const char *f; + struct statvfs ss; + uint64_t sum = 0, avail = 0, ss_avail = 0; + int r; + DIR *d; + usec_t ts = now(CLOCK_MONOTONIC); + + if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts) + return s->cached_available_space; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return 0; + + if (s->system_journal) + f = "/var/log/journal/"; + else + f = "/run/log/journal/"; + + p = strappend(f, sd_id128_to_string(machine, ids)); + if (!p) + return 0; + + d = opendir(p); + free(p); + + if (!d) + return 0; + + if (fstatvfs(dirfd(d), &ss) < 0) + goto finish; + + for (;;) { + struct stat st; + struct dirent buf, *de; + int k; + + k = readdir_r(d, &buf, &de); + if (k != 0) { + r = -k; + goto finish; + } + + if (!de) + break; + + if (!dirent_is_file_with_suffix(de, ".journal")) + continue; + + if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) + continue; + + sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize; + } + + avail = sum >= s->max_use ? 0 : s->max_use - sum; + + ss_avail = ss.f_bsize * ss.f_bavail; + + ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free; + + if (ss_avail < avail) + avail = ss_avail; + + s->cached_available_space = avail; + s->cached_available_space_timestamp = ts; + +finish: + closedir(d); + + return avail; +} + static void fix_perms(JournalFile *f, uid_t uid) { acl_t acl; acl_entry_t entry; @@ -115,8 +243,12 @@ static JournalFile* find_journal(Server *s, uid_t uid) { assert(s); - /* We split up user logs only on /var, not on /run */ - if (!s->system_journal) + /* We split up user logs only on /var, not on /run. If the + * runtime file is open, we write to it exclusively, in order + * to guarantee proper order as soon as we flush /run to + * /var and close the runtime file. */ + + if (s->runtime_journal) return s->runtime_journal; if (uid <= 0) @@ -217,9 +349,44 @@ static void server_vacuum(Server *s) { if (r < 0 && r != -ENOENT) log_error("Failed to vacuum %s: %s", p, strerror(-r)); free(p); + + s->cached_available_space_timestamp = 0; +} + +static char *shortened_cgroup_path(pid_t pid) { + int r; + char *process_path, *init_path, *path; + + assert(pid > 0); + + r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path); + if (r < 0) + return NULL; + + r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path); + if (r < 0) { + free(process_path); + return NULL; + } + + if (streq(init_path, "/")) + init_path[0] = 0; + + if (startswith(process_path, init_path)) + path = process_path + strlen(init_path); + else + path = process_path; + + free(init_path); + + return path; } -static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigned m, struct ucred *ucred, struct timeval *tv) { +static void dispatch_message_real(Server *s, + struct iovec *iovec, unsigned n, unsigned m, + struct ucred *ucred, + struct timeval *tv) { + char *pid = NULL, *uid = NULL, *gid = NULL, *source_time = NULL, *boot_id = NULL, *machine_id = NULL, *comm = NULL, *cmdline = NULL, *hostname = NULL, @@ -235,11 +402,8 @@ static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigne bool vacuumed = false; assert(s); - assert(iovec || n == 0); - - if (n == 0) - return; - + assert(iovec); + assert(n > 0); assert(n + 13 <= m); if (ucred) { @@ -291,11 +455,12 @@ static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigne if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0) IOVEC_SET_STRING(iovec[n++], audit_loginuid); - r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, ucred->pid, &path); - if (r >= 0) { + path = shortened_cgroup_path(ucred->pid); + if (path) { cgroup = strappend("_SYSTEMD_CGROUP=", path); if (cgroup) IOVEC_SET_STRING(iovec[n++], cgroup); + free(path); } } @@ -329,6 +494,8 @@ static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigne assert(n <= m); + server_flush_to_var(s); + retry: f = find_journal(s, realuid == 0 ? 0 : loginuid); if (!f) @@ -363,7 +530,72 @@ retry: free(audit_session); free(audit_loginuid); free(cgroup); +} + +static void dispatch_message(Server *s, + struct iovec *iovec, unsigned n, unsigned m, + struct ucred *ucred, + struct timeval *tv, + int priority) { + int rl; + char *path, *c; + + assert(s); + assert(iovec || n == 0); + + if (n == 0) + return; + + if (!ucred) + goto finish; + + path = shortened_cgroup_path(ucred->pid); + if (!path) + goto finish; + + /* example: /user/lennart/3/foobar + * /system/dbus.service/foobar + * + * So let's cut of everything past the third /, since that is + * wher user directories start */ + + c = strchr(path, '/'); + if (c) { + c = strchr(c+1, '/'); + if (c) { + c = strchr(c+1, '/'); + if (c) + *c = 0; + } + } + rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s)); + + if (rl == 0) { + free(path); + return; + } + + if (rl > 1) { + int j = 0; + char suppress_message[LINE_MAX]; + struct iovec suppress_iovec[15]; + + /* Write a suppression message if we suppressed something */ + + snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path); + char_array_0(suppress_message); + + IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5"); + IOVEC_SET_STRING(suppress_iovec[j++], suppress_message); + + dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL); + } + + free(path); + +finish: + dispatch_message_real(s, iovec, n, m, ucred, tv); } static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) { @@ -388,7 +620,7 @@ static void process_syslog_message(Server *s, const char *buf, struct ucred *ucr if (message) IOVEC_SET_STRING(iovec[n++], message); - dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv); + dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK); free(message); free(syslog_facility); @@ -435,6 +667,7 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ unsigned n = 0, m = 0, j; const char *p; size_t remaining; + int priority = LOG_INFO; assert(s); assert(buffer || n == 0); @@ -455,8 +688,9 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ if (e == p) { /* Entry separator */ - dispatch_message(s, iovec, n, m, ucred, tv); + dispatch_message(s, iovec, n, m, ucred, tv, priority); n = 0; + priority = LOG_INFO; p++; remaining--; @@ -498,6 +732,15 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ iovec[n].iov_base = (char*) p; iovec[n].iov_len = e - p; n++; + + /* We need to determine the priority + * of this entry for the rate limiting + * logic */ + if (e - p == 10 && + memcmp(p, "PRIORITY=", 10) == 0 && + p[10] >= '0' && + p[10] <= '9') + priority = p[10] - '0'; } remaining -= (e - p) + 1; @@ -543,7 +786,7 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ } } - dispatch_message(s, iovec, n, m, ucred, tv); + dispatch_message(s, iovec, n, m, ucred, tv, priority); for (j = 0; j < n; j++) if (iovec[j].iov_base < buffer || @@ -551,192 +794,642 @@ static void process_native_message(Server *s, const void *buffer, size_t buffer_ free(iovec[j].iov_base); } -static int process_event(Server *s, struct epoll_event *ev) { +static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) { + struct iovec iovec[15]; + char *message = NULL, *syslog_priority = NULL; + unsigned n = 0; + size_t tag_len; + int priority; + assert(s); + assert(p); - if (ev->events != EPOLLIN) { - log_info("Got invalid event from epoll."); - return -EIO; - } + priority = s->priority; - if (ev->data.fd == s->signal_fd) { - struct signalfd_siginfo sfsi; - ssize_t n; + if (s->priority_prefix && + l > 3 && + p[0] == '<' && + p[1] >= '0' && p[1] <= '7' && + p[2] == '>') { - n = read(s->signal_fd, &sfsi, sizeof(sfsi)); - if (n != sizeof(sfsi)) { + priority = p[1] - '0'; + p += 3; + l -= 3; + } - if (n >= 0) - return -EIO; + if (l <= 0) + return 0; - if (errno == EINTR || errno == EAGAIN) - return 0; + if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_priority); - return -errno; - } + tag_len = s->tag ? strlen(s->tag) + 2: 0; + message = malloc(8 + tag_len + l); + if (message) { + memcpy(message, "MESSAGE=", 8); - log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo)); - return 0; + if (s->tag) { + memcpy(message+8, s->tag, tag_len-2); + memcpy(message+8+tag_len-2, ": ", 2); + } + memcpy(message+8+tag_len, p, l); + iovec[n].iov_base = message; + iovec[n].iov_len = 8+tag_len+l; + n++; } - if (ev->data.fd == s->native_fd || - ev->data.fd == s->syslog_fd) { - for (;;) { - struct msghdr msghdr; - struct iovec iovec; - struct ucred *ucred = NULL; - struct timeval *tv = NULL; - struct cmsghdr *cmsg; - union { - struct cmsghdr cmsghdr; - uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) + - CMSG_SPACE(sizeof(struct timeval))]; - } control; - ssize_t n; - int v; + dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority); - if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) { - log_error("SIOCINQ failed: %m"); - return -errno; + if (s->tee_console) { + int console; + + console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC); + if (console >= 0) { + n = 0; + if (s->tag) { + IOVEC_SET_STRING(iovec[n++], s->tag); + IOVEC_SET_STRING(iovec[n++], ": "); } - if (v <= 0) - return 1; + iovec[n].iov_base = (void*) p; + iovec[n].iov_len = l; + n++; - if (s->buffer_size < (size_t) v) { - void *b; - size_t l; + IOVEC_SET_STRING(iovec[n++], (char*) "\n"); - l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2); - b = realloc(s->buffer, l+1); + writev(console, iovec, n); + } + } - if (!b) { - log_error("Couldn't increase buffer."); - return -ENOMEM; - } + free(message); + free(syslog_priority); - s->buffer_size = l; - s->buffer = b; - } + return 0; +} - zero(iovec); - iovec.iov_base = s->buffer; - iovec.iov_len = s->buffer_size; +static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) { + assert(s); + assert(p); - zero(control); - zero(msghdr); - msghdr.msg_iov = &iovec; - msghdr.msg_iovlen = 1; - msghdr.msg_control = &control; - msghdr.msg_controllen = sizeof(control); + while (l > 0 && strchr(WHITESPACE, *p)) { + l--; + p++; + } - n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT); - if (n < 0) { + while (l > 0 && strchr(WHITESPACE, *(p+l-1))) + l--; - if (errno == EINTR || errno == EAGAIN) - return 1; + switch (s->state) { - log_error("recvmsg() failed: %m"); - return -errno; + case STDOUT_STREAM_TAG: + + if (l > 0) { + s->tag = strndup(p, l); + if (!s->tag) { + log_error("Out of memory"); + return -EINVAL; } + } - for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) { + s->state = STDOUT_STREAM_PRIORITY; + return 0; - if (cmsg->cmsg_level == SOL_SOCKET && - cmsg->cmsg_type == SCM_CREDENTIALS && - cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) - ucred = (struct ucred*) CMSG_DATA(cmsg); - else if (cmsg->cmsg_level == SOL_SOCKET && - cmsg->cmsg_type == SO_TIMESTAMP && - cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval))) - tv = (struct timeval*) CMSG_DATA(cmsg); - } + case STDOUT_STREAM_PRIORITY: + if (l != 1 || *p < '0' || *p > '7') { + log_warning("Failed to parse log priority line."); + return -EINVAL; + } - if (ev->data.fd == s->syslog_fd) { - char *e; + s->priority = *p - '0'; + s->state = STDOUT_STREAM_PRIORITY_PREFIX; + return 0; - e = memchr(s->buffer, '\n', n); - if (e) - *e = 0; - else - s->buffer[n] = 0; + case STDOUT_STREAM_PRIORITY_PREFIX: + if (l != 1 || *p < '0' || *p > '1') { + log_warning("Failed to parse priority prefix line."); + return -EINVAL; + } - process_syslog_message(s, strstrip(s->buffer), ucred, tv); - } else - process_native_message(s, s->buffer, n, ucred, tv); + s->priority_prefix = *p - '0'; + s->state = STDOUT_STREAM_TEE_CONSOLE; + return 0; + + case STDOUT_STREAM_TEE_CONSOLE: + if (l != 1 || *p < '0' || *p > '1') { + log_warning("Failed to parse tee to console line."); + return -EINVAL; } - return 1; + s->tee_console = *p - '0'; + s->state = STDOUT_STREAM_RUNNING; + return 0; + + case STDOUT_STREAM_RUNNING: + return stdout_stream_log(s, p, l); } - log_error("Unknown event."); - return 0; + assert_not_reached("Unknown stream state"); } -static int system_journal_open(Server *s) { +static int stdout_stream_scan(StdoutStream *s, bool force_flush) { + char *p; + size_t remaining; int r; - char *fn; - sd_id128_t machine; - char ids[33]; - r = sd_id128_get_machine(&machine); - if (r < 0) - return r; + assert(s); - /* First try to create the machine path, but not the prefix */ - fn = strappend("/var/log/journal/", sd_id128_to_string(machine, ids)); - if (!fn) - return -ENOMEM; - (void) mkdir(fn, 0755); - free(fn); + p = s->buffer; + remaining = s->length; + for (;;) { + char *end; + size_t skip; + + end = memchr(p, '\n', remaining); + if (!end) { + if (remaining >= LINE_MAX) { + end = p + LINE_MAX; + skip = LINE_MAX; + } else + break; + } else + skip = end - p + 1; - /* The create the system journal file */ - fn = join("/var/log/journal/", ids, "/system.journal", NULL); - if (!fn) - return -ENOMEM; + r = stdout_stream_line(s, p, end - p); + if (r < 0) + return r; - r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal); - free(fn); + remaining -= skip; + p += skip; + } - if (r >= 0) { - s->system_journal->metrics = s->metrics; - s->system_journal->compress = s->compress; + if (force_flush && remaining > 0) { + r = stdout_stream_line(s, p, remaining); + if (r < 0) + return r; - fix_perms(s->system_journal, 0); - return r; + p += remaining; + remaining = 0; } - if (r < 0 && r != -ENOENT) { - log_error("Failed to open system journal: %s", strerror(-r)); - return r; + if (p > s->buffer) { + memmove(s->buffer, p, remaining); + s->length = remaining; } - /* /var didn't work, so try /run, but this time we - * create the prefix too */ - fn = join("/run/log/journal/", ids, "/system.journal", NULL); - if (!fn) - return -ENOMEM; + return 0; +} - (void) mkdir_parents(fn, 0755); - r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal); - free(fn); +static int stdout_stream_process(StdoutStream *s) { + ssize_t l; + int r; - if (r < 0) { - log_error("Failed to open runtime journal: %s", strerror(-r)); - return r; - } + assert(s); - s->runtime_journal->metrics = s->metrics; - s->runtime_journal->compress = s->compress; + l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length); + if (l < 0) { - fix_perms(s->runtime_journal, 0); - return r; -} + if (errno == EAGAIN) + return 0; -static int open_syslog_socket(Server *s) { + log_warning("Failed to read from stream: %m"); + return -errno; + } + + if (l == 0) { + r = stdout_stream_scan(s, true); + if (r < 0) + return r; + + return 0; + } + + s->length += l; + r = stdout_stream_scan(s, false); + if (r < 0) + return r; + + return 1; + +} + +static void stdout_stream_free(StdoutStream *s) { + assert(s); + + if (s->server) { + assert(s->server->n_stdout_streams > 0); + s->server->n_stdout_streams --; + LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s); + } + + if (s->fd >= 0) { + if (s->server) + epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL); + + close_nointr_nofail(s->fd); + } + + free(s->tag); + free(s); +} + +static int stdout_stream_new(Server *s) { + StdoutStream *stream; + int fd, r; + socklen_t len; + struct epoll_event ev; + + assert(s); + + fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); + if (fd < 0) { + if (errno == EAGAIN) + return 0; + + log_error("Failed to accept stdout connection: %m"); + return -errno; + } + + if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) { + log_warning("Too many stdout streams, refusing connection."); + close_nointr_nofail(fd); + return 0; + } + + stream = new0(StdoutStream, 1); + if (!stream) { + log_error("Out of memory."); + close_nointr_nofail(fd); + return -ENOMEM; + } + + stream->fd = fd; + + len = sizeof(stream->ucred); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) { + log_error("Failed to determine peer credentials: %m"); + r = -errno; + goto fail; + } + + if (shutdown(fd, SHUT_WR) < 0) { + log_error("Failed to shutdown writing side of socket: %m"); + r = -errno; + goto fail; + } + + zero(ev); + ev.data.ptr = stream; + ev.events = EPOLLIN; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { + log_error("Failed to add stream to event loop: %m"); + r = -errno; + goto fail; + } + + stream->server = s; + LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream); + s->n_stdout_streams ++; + + return 0; + +fail: + stdout_stream_free(stream); + return r; +} + +static int system_journal_open(Server *s) { + int r; + char *fn; + sd_id128_t machine; + char ids[33]; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return r; + + sd_id128_to_string(machine, ids); + + if (!s->system_journal) { + + /* First try to create the machine path, but not the prefix */ + fn = strappend("/var/log/journal/", ids); + if (!fn) + return -ENOMEM; + (void) mkdir(fn, 0755); + free(fn); + + /* The create the system journal file */ + fn = join("/var/log/journal/", ids, "/system.journal", NULL); + if (!fn) + return -ENOMEM; + + r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal); + free(fn); + + if (r >= 0) { + s->system_journal->metrics = s->metrics; + s->system_journal->compress = s->compress; + + fix_perms(s->system_journal, 0); + } else if (r < 0) { + + if (r == -ENOENT) + r = 0; + else { + log_error("Failed to open system journal: %s", strerror(-r)); + return r; + } + } + } + + if (!s->runtime_journal) { + + fn = join("/run/log/journal/", ids, "/system.journal", NULL); + if (!fn) + return -ENOMEM; + + if (s->system_journal) { + + /* Try to open the runtime journal, but only + * if it already exists, so that we can flush + * it into the system journal */ + + r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal); + free(fn); + + if (r < 0) { + + if (r == -ENOENT) + r = 0; + else { + log_error("Failed to open runtime journal: %s", strerror(-r)); + return r; + } + } + + } else { + + /* OK, we really need the runtime journal, so create + * it if necessary. */ + + (void) mkdir_parents(fn, 0755); + r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal); + free(fn); + + if (r < 0) { + log_error("Failed to open runtime journal: %s", strerror(-r)); + return r; + } + } + + if (s->runtime_journal) { + s->runtime_journal->metrics = s->metrics; + s->runtime_journal->compress = s->compress; + + fix_perms(s->runtime_journal, 0); + } + } + + return r; +} + +static int server_flush_to_var(Server *s) { + char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; + Object *o = NULL; + int r; + sd_id128_t machine; + sd_journal *j; + + assert(s); + + system_journal_open(s); + + if (!s->system_journal || !s->runtime_journal) + return 0; + + r = sd_id128_get_machine(&machine); + if (r < 0) { + log_error("Failed to get machine id: %s", strerror(-r)); + return r; + } + + r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY); + if (r < 0) { + log_error("Failed to read runtime journal: %s", strerror(-r)); + return r; + } + + SD_JOURNAL_FOREACH(j) { + JournalFile *f; + + f = j->current_file; + assert(f && f->current_offset > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) { + log_error("Can't read entry: %s", strerror(-r)); + goto finish; + } + + r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); + if (r == -E2BIG) { + log_info("Allocation limit reached."); + + journal_file_post_change(s->system_journal); + server_vacuum(s); + + r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); + } + + if (r < 0) { + log_error("Can't write entry: %s", strerror(-r)); + goto finish; + } + } + +finish: + journal_file_post_change(s->system_journal); + + journal_file_close(s->runtime_journal); + s->runtime_journal = NULL; + + if (r >= 0) { + sd_id128_to_string(machine, path + 17); + rm_rf(path, false, true, false); + } + + return r; +} + +static int process_event(Server *s, struct epoll_event *ev) { + assert(s); + + if (ev->data.fd == s->signal_fd) { + struct signalfd_siginfo sfsi; + ssize_t n; + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + n = read(s->signal_fd, &sfsi, sizeof(sfsi)); + if (n != sizeof(sfsi)) { + + if (n >= 0) + return -EIO; + + if (errno == EINTR || errno == EAGAIN) + return 0; + + return -errno; + } + + if (sfsi.ssi_signo == SIGUSR1) { + server_flush_to_var(s); + return 0; + } + + log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo)); + return 0; + + } else if (ev->data.fd == s->native_fd || + ev->data.fd == s->syslog_fd) { + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + for (;;) { + struct msghdr msghdr; + struct iovec iovec; + struct ucred *ucred = NULL; + struct timeval *tv = NULL; + struct cmsghdr *cmsg; + union { + struct cmsghdr cmsghdr; + uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) + + CMSG_SPACE(sizeof(struct timeval))]; + } control; + ssize_t n; + int v; + + if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) { + log_error("SIOCINQ failed: %m"); + return -errno; + } + + if (v <= 0) + return 1; + + if (s->buffer_size < (size_t) v) { + void *b; + size_t l; + + l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2); + b = realloc(s->buffer, l+1); + + if (!b) { + log_error("Couldn't increase buffer."); + return -ENOMEM; + } + + s->buffer_size = l; + s->buffer = b; + } + + zero(iovec); + iovec.iov_base = s->buffer; + iovec.iov_len = s->buffer_size; + + zero(control); + zero(msghdr); + msghdr.msg_iov = &iovec; + msghdr.msg_iovlen = 1; + msghdr.msg_control = &control; + msghdr.msg_controllen = sizeof(control); + + n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT); + if (n < 0) { + + if (errno == EINTR || errno == EAGAIN) + return 1; + + log_error("recvmsg() failed: %m"); + return -errno; + } + + for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) { + + if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SCM_CREDENTIALS && + cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) + ucred = (struct ucred*) CMSG_DATA(cmsg); + else if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SO_TIMESTAMP && + cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval))) + tv = (struct timeval*) CMSG_DATA(cmsg); + } + + if (ev->data.fd == s->syslog_fd) { + char *e; + + e = memchr(s->buffer, '\n', n); + if (e) + *e = 0; + else + s->buffer[n] = 0; + + process_syslog_message(s, strstrip(s->buffer), ucred, tv); + } else + process_native_message(s, s->buffer, n, ucred, tv); + } + + return 1; + + } else if (ev->data.fd == s->stdout_fd) { + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + stdout_stream_new(s); + return 1; + + } else { + StdoutStream *stream; + + if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + /* If it is none of the well-known fds, it must be an + * stdout stream fd. Note that this is a bit ugly here + * (since we rely that none of the well-known fds + * could be interpreted as pointer), but nonetheless + * safe, since the well-known fds would never get an + * fd > 4096, i.e. beyond the first memory page */ + + stream = ev->data.ptr; + + if (stdout_stream_process(stream) <= 0) + stdout_stream_free(stream); + + return 1; + } + + log_error("Unknown event."); + return 0; +} + +static int open_syslog_socket(Server *s) { union sockaddr_union sa; int one, r; + struct epoll_event ev; assert(s); @@ -777,12 +1470,21 @@ static int open_syslog_socket(Server *s) { return -errno; } + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->syslog_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) { + log_error("Failed to add syslog server fd to epoll object: %m"); + return -errno; + } + return 0; } static int open_native_socket(Server*s) { union sockaddr_union sa; int one, r; + struct epoll_event ev; assert(s); @@ -823,24 +1525,110 @@ static int open_native_socket(Server*s) { return -errno; } + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->native_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) { + log_error("Failed to add native server fd to epoll object: %m"); + return -errno; + } + return 0; } -static int server_init(Server *s) { - int n, r, fd; +static int open_stdout_socket(Server *s) { + union sockaddr_union sa; + int r; struct epoll_event ev; + + assert(s); + + if (s->stdout_fd < 0) { + + s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); + if (s->stdout_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); + + if (listen(s->stdout_fd, SOMAXCONN) < 0) { + log_error("liste() failed: %m"); + return -errno; + } + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->stdout_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) { + log_error("Failed to add stdout server fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_signalfd(Server *s) { sigset_t mask; + struct epoll_event ev; + + assert(s); + + assert_se(sigemptyset(&mask) == 0); + sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1); + assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); + + s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); + if (s->signal_fd < 0) { + log_error("signalfd(): %m"); + return -errno; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->signal_fd; + + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) { + log_error("epoll_ctl(): %m"); + return -errno; + } + + return 0; +} + +static int server_init(Server *s) { + int n, r, fd; assert(s); zero(*s); - s->syslog_fd = s->native_fd = s->signal_fd = -1; + s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1; s->metrics.max_size = DEFAULT_MAX_SIZE; s->metrics.min_size = DEFAULT_MIN_SIZE; s->metrics.keep_free = DEFAULT_KEEP_FREE; s->max_use = DEFAULT_MAX_USE; s->compress = true; + s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func); + if (!s->user_journals) { + log_error("Out of memory."); + return -ENOMEM; + } + s->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (s->epoll_fd < 0) { log_error("Failed to create epoll object: %m"); @@ -855,23 +1643,33 @@ static int server_init(Server *s) { for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) { - if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) { + if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) { - if (s->syslog_fd >= 0) { - log_error("Too many /dev/log sockets passed."); + if (s->native_fd >= 0) { + log_error("Too many native sockets passed."); return -EINVAL; } - s->syslog_fd = fd; + s->native_fd = fd; - } else if (sd_is_socket(fd, AF_UNIX, SOCK_DGRAM, -1) > 0) { + } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) { - if (s->native_fd >= 0) { - log_error("Too many native sockets passed."); + if (s->stdout_fd >= 0) { + log_error("Too many stdout sockets passed."); return -EINVAL; } - s->native_fd = fd; + s->stdout_fd = fd; + + } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) { + + if (s->syslog_fd >= 0) { + log_error("Too many /dev/log sockets passed."); + return -EINVAL; + } + + s->syslog_fd = fd; + } else { log_error("Unknown socket passed."); return -EINVAL; @@ -882,54 +1680,25 @@ static int server_init(Server *s) { if (r < 0) return r; - zero(ev); - ev.events = EPOLLIN; - ev.data.fd = s->syslog_fd; - if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) { - log_error("Failed to add syslog server fd to epoll object: %m"); - return -errno; - } - r = open_native_socket(s); if (r < 0) return r; - zero(ev); - ev.events = EPOLLIN; - ev.data.fd = s->native_fd; - if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) { - log_error("Failed to add native server fd to epoll object: %m"); - return -errno; - } - - s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func); - if (!s->user_journals) { - log_error("Out of memory."); - return -ENOMEM; - } + r = open_stdout_socket(s); + if (r < 0) + return r; r = system_journal_open(s); if (r < 0) return r; - assert_se(sigemptyset(&mask) == 0); - sigset_add_many(&mask, SIGINT, SIGTERM, -1); - assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); - - s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); - if (s->signal_fd < 0) { - log_error("signalfd(): %m"); - return -errno; - } - - zero(ev); - ev.events = EPOLLIN; - ev.data.fd = s->signal_fd; + r = open_signalfd(s); + if (r < 0) + return r; - if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) { - log_error("epoll_ctl(): %m"); - return -errno; - } + s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST); + if (!s->rate_limit) + return -ENOMEM; return 0; } @@ -938,6 +1707,9 @@ static void server_done(Server *s) { JournalFile *f; assert(s); + while (s->stdout_streams) + stdout_stream_free(s->stdout_streams); + if (s->system_journal) journal_file_close(s->system_journal); @@ -960,6 +1732,12 @@ static void server_done(Server *s) { if (s->native_fd >= 0) close_nointr_nofail(s->native_fd); + + if (s->stdout_fd >= 0) + close_nointr_nofail(s->stdout_fd); + + if (s->rate_limit) + journal_rate_limit_free(s->rate_limit); } int main(int argc, char *argv[]) { @@ -991,7 +1769,10 @@ int main(int argc, char *argv[]) { sd_notify(false, "READY=1\n" - "STATUS=Processing messages..."); + "STATUS=Processing requests..."); + + server_vacuum(&server); + server_flush_to_var(&server); for (;;) { struct epoll_event event; @@ -1015,6 +1796,8 @@ int main(int argc, char *argv[]) { break; } + log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid()); + finish: sd_notify(false, "STATUS=Shutting down...");