X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=blobdiff_plain;f=src%2Fjournal%2Fjournald.c;h=b290b5d2c063bb0c42639a4f090a4a5690424871;hb=8b18eb674ce4d14e4819e102a0d6679a0fd2e6ce;hp=9f753013a09755e2e2a479a94daa1a76e2f71896;hpb=6e409ce10d134625626d1eddfd6152755ef1908d;p=elogind.git diff --git a/src/journal/journald.c b/src/journal/journald.c index 9f753013a..b290b5d2c 100644 --- a/src/journal/journald.c +++ b/src/journal/journald.c @@ -40,10 +40,21 @@ #include "cgroup-util.h" #include "list.h" #include "journal-rate-limit.h" +#include "sd-journal.h" +#include "journal-internal.h" #define USER_JOURNALS_MAX 1024 #define STDOUT_STREAMS_MAX 4096 +#define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC) +#define DEFAULT_RATE_LIMIT_BURST 200 + +#define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC) + +#define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC) + +#define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC) + typedef struct StdoutStream StdoutStream; typedef struct Server { @@ -68,6 +79,11 @@ typedef struct Server { uint64_t max_use; bool compress; + uint64_t cached_available_space; + usec_t cached_available_space_timestamp; + + uint64_t var_available_timestamp; + LIST_HEAD(StdoutStream, stdout_streams); unsigned n_stdout_streams; } Server; @@ -99,6 +115,8 @@ struct StdoutStream { LIST_FIELDS(StdoutStream, stdout_stream); }; +static int server_flush_to_var(Server *s); + static uint64_t available_space(Server *s) { char ids[33]; sd_id128_t machine; @@ -108,6 +126,10 @@ static uint64_t available_space(Server *s) { uint64_t sum = 0, avail = 0, ss_avail = 0; int r; DIR *d; + usec_t ts = now(CLOCK_MONOTONIC); + + if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts) + return s->cached_available_space; r = sd_id128_get_machine(&machine); if (r < 0) @@ -163,6 +185,9 @@ static uint64_t available_space(Server *s) { if (ss_avail < avail) avail = ss_avail; + s->cached_available_space = avail; + s->cached_available_space_timestamp = ts; + finish: closedir(d); @@ -224,8 +249,12 @@ static JournalFile* find_journal(Server *s, uid_t uid) { assert(s); - /* We split up user logs only on /var, not on /run */ - if (!s->system_journal) + /* We split up user logs only on /var, not on /run. If the + * runtime file is open, we write to it exclusively, in order + * to guarantee proper order as soon as we flush /run to + * /var and close the runtime file. */ + + if (s->runtime_journal) return s->runtime_journal; if (uid <= 0) @@ -326,6 +355,8 @@ static void server_vacuum(Server *s) { if (r < 0 && r != -ENOENT) log_error("Failed to vacuum %s: %s", p, strerror(-r)); free(p); + + s->cached_available_space_timestamp = 0; } static char *shortened_cgroup_path(pid_t pid) { @@ -469,6 +500,8 @@ static void dispatch_message_real(Server *s, assert(n <= m); + server_flush_to_var(s); + retry: f = find_journal(s, realuid == 0 ? 0 : loginuid); if (!f) @@ -1071,6 +1104,243 @@ fail: return r; } +static int system_journal_open(Server *s) { + int r; + char *fn; + sd_id128_t machine; + char ids[33]; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return r; + + sd_id128_to_string(machine, ids); + + if (!s->system_journal) { + + /* First try to create the machine path, but not the prefix */ + fn = strappend("/var/log/journal/", ids); + if (!fn) + return -ENOMEM; + (void) mkdir(fn, 0755); + free(fn); + + /* The create the system journal file */ + fn = join("/var/log/journal/", ids, "/system.journal", NULL); + if (!fn) + return -ENOMEM; + + r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal); + free(fn); + + if (r >= 0) { + s->system_journal->metrics = s->metrics; + s->system_journal->compress = s->compress; + + fix_perms(s->system_journal, 0); + } else if (r < 0) { + + if (r == -ENOENT) + r = 0; + else { + log_error("Failed to open system journal: %s", strerror(-r)); + return r; + } + } + } + + if (!s->runtime_journal) { + + fn = join("/run/log/journal/", ids, "/system.journal", NULL); + if (!fn) + return -ENOMEM; + + if (s->system_journal) { + + /* Try to open the runtime journal, but only + * if it already exists, so that we can flush + * it into the system journal */ + + r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal); + free(fn); + + if (r < 0) { + + if (r == -ENOENT) + r = 0; + else { + log_error("Failed to open runtime journal: %s", strerror(-r)); + return r; + } + } + + } else { + + /* OK, we really need the runtime journal, so create + * it if necessary. */ + + (void) mkdir_parents(fn, 0755); + r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal); + free(fn); + + if (r < 0) { + log_error("Failed to open runtime journal: %s", strerror(-r)); + return r; + } + } + + if (s->runtime_journal) { + s->runtime_journal->metrics = s->metrics; + s->runtime_journal->compress = s->compress; + + fix_perms(s->runtime_journal, 0); + } + } + + return r; +} + +static int server_flush_to_var(Server *s) { + char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; + Object *o = NULL; + int r; + sd_id128_t machine; + sd_journal *j; + usec_t ts; + + assert(s); + + if (!s->runtime_journal) + return 0; + + ts = now(CLOCK_MONOTONIC); + if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts) + return 0; + + s->var_available_timestamp = ts; + + system_journal_open(s); + + if (!s->system_journal) + return 0; + + r = sd_id128_get_machine(&machine); + if (r < 0) { + log_error("Failed to get machine id: %s", strerror(-r)); + return r; + } + + r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY); + if (r < 0) { + log_error("Failed to read runtime journal: %s", strerror(-r)); + return r; + } + + SD_JOURNAL_FOREACH(j) { + JournalFile *f; + + f = j->current_file; + assert(f && f->current_offset > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) { + log_error("Can't read entry: %s", strerror(-r)); + goto finish; + } + + r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); + if (r == -E2BIG) { + log_info("Allocation limit reached."); + + journal_file_post_change(s->system_journal); + server_vacuum(s); + + r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); + } + + if (r < 0) { + log_error("Can't write entry: %s", strerror(-r)); + goto finish; + } + } + +finish: + journal_file_post_change(s->system_journal); + + journal_file_close(s->runtime_journal); + s->runtime_journal = NULL; + + if (r >= 0) { + sd_id128_to_string(machine, path + 17); + rm_rf(path, false, true, false); + } + + return r; +} + +static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) { + struct msghdr msghdr; + struct iovec iovec; + struct cmsghdr *cmsg; + union { + struct cmsghdr cmsghdr; + uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) + + CMSG_SPACE(sizeof(struct timeval))]; + } control; + union sockaddr_union sa; + + assert(s); + + zero(msghdr); + + zero(iovec); + iovec.iov_base = (void*) buffer; + iovec.iov_len = length; + msghdr.msg_iov = &iovec; + msghdr.msg_iovlen = 1; + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path)); + msghdr.msg_name = &sa; + msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path); + + zero(control); + msghdr.msg_control = &control; + msghdr.msg_controllen = sizeof(control); + + cmsg = CMSG_FIRSTHDR(&msghdr); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_CREDENTIALS; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred)); + memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred)); + msghdr.msg_controllen = cmsg->cmsg_len; + + /* Forward the syslog message we received via /dev/log to + * /run/systemd/syslog. Unfortunately we currently can't set + * the SO_TIMESTAMP auxiliary data, and hence we don't. */ + + if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0) + return; + + if (errno == ESRCH) { + struct ucred u; + + /* Hmm, presumably the sender process vanished + * by now, so let's fix it as good as we + * can, and retry */ + + u = *ucred; + u.pid = getpid(); + memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred)); + + if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0) + return; + } + + log_debug("Failed to forward syslog message: %m"); +} + static int process_event(Server *s, struct epoll_event *ev) { assert(s); @@ -1095,6 +1365,11 @@ static int process_event(Server *s, struct epoll_event *ev) { return -errno; } + if (sfsi.ssi_signo == SIGUSR1) { + server_flush_to_var(s); + return 0; + } + log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo)); return 0; @@ -1186,6 +1461,7 @@ static int process_event(Server *s, struct epoll_event *ev) { else s->buffer[n] = 0; + forward_syslog(s, s->buffer, n, ucred, tv); process_syslog_message(s, strstrip(s->buffer), ucred, tv); } else process_native_message(s, s->buffer, n, ucred, tv); @@ -1230,70 +1506,11 @@ static int process_event(Server *s, struct epoll_event *ev) { return 0; } -static int system_journal_open(Server *s) { - int r; - char *fn; - sd_id128_t machine; - char ids[33]; - - r = sd_id128_get_machine(&machine); - if (r < 0) - return r; - - /* First try to create the machine path, but not the prefix */ - fn = strappend("/var/log/journal/", sd_id128_to_string(machine, ids)); - if (!fn) - return -ENOMEM; - (void) mkdir(fn, 0755); - free(fn); - - /* The create the system journal file */ - fn = join("/var/log/journal/", ids, "/system.journal", NULL); - if (!fn) - return -ENOMEM; - - r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal); - free(fn); - - if (r >= 0) { - s->system_journal->metrics = s->metrics; - s->system_journal->compress = s->compress; - - fix_perms(s->system_journal, 0); - return r; - } - - if (r < 0 && r != -ENOENT) { - log_error("Failed to open system journal: %s", strerror(-r)); - return r; - } - - /* /var didn't work, so try /run, but this time we - * create the prefix too */ - fn = join("/run/log/journal/", ids, "/system.journal", NULL); - if (!fn) - return -ENOMEM; - - (void) mkdir_parents(fn, 0755); - r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal); - free(fn); - - if (r < 0) { - log_error("Failed to open runtime journal: %s", strerror(-r)); - return r; - } - - s->runtime_journal->metrics = s->metrics; - s->runtime_journal->compress = s->compress; - - fix_perms(s->runtime_journal, 0); - return r; -} - static int open_syslog_socket(Server *s) { union sockaddr_union sa; int one, r; struct epoll_event ev; + struct timeval tv; assert(s); @@ -1307,7 +1524,7 @@ static int open_syslog_socket(Server *s) { zero(sa); sa.un.sun_family = AF_UNIX; - strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path)); + strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path)); unlink(sa.un.sun_path); @@ -1334,6 +1551,15 @@ static int open_syslog_socket(Server *s) { return -errno; } + /* Since we use the same socket for forwarding this to some + * other syslog implementation, make sure we don't hang + * forever */ + timeval_store(&tv, SYSLOG_TIMEOUT_USEC); + if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) { + log_error("SO_SNDTIMEO failed: %m"); + return -errno; + } + zero(ev); ev.events = EPOLLIN; ev.data.fd = s->syslog_fd; @@ -1453,7 +1679,7 @@ static int open_signalfd(Server *s) { assert(s); assert_se(sigemptyset(&mask) == 0); - sigset_add_many(&mask, SIGINT, SIGTERM, -1); + sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1); assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); @@ -1560,7 +1786,7 @@ static int server_init(Server *s) { if (r < 0) return r; - s->rate_limit = journal_rate_limit_new(10*USEC_PER_SEC, 2); + s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST); if (!s->rate_limit) return -ENOMEM; @@ -1635,6 +1861,9 @@ int main(int argc, char *argv[]) { "READY=1\n" "STATUS=Processing requests..."); + server_vacuum(&server); + server_flush_to_var(&server); + for (;;) { struct epoll_event event;