chiark / gitweb /
journald: don't rotate on startup
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "sd-login.h"
45 #include "journal-internal.h"
46
47 #define USER_JOURNALS_MAX 1024
48 #define STDOUT_STREAMS_MAX 4096
49
50 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
51 #define DEFAULT_RATE_LIMIT_BURST 200
52
53 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
54
55 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
56
57 #define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
58
59 typedef struct StdoutStream StdoutStream;
60
61 typedef struct Server {
62         int epoll_fd;
63         int signal_fd;
64         int syslog_fd;
65         int native_fd;
66         int stdout_fd;
67
68         JournalFile *runtime_journal;
69         JournalFile *system_journal;
70         Hashmap *user_journals;
71
72         uint64_t seqnum;
73
74         char *buffer;
75         size_t buffer_size;
76
77         JournalRateLimit *rate_limit;
78
79         JournalMetrics runtime_metrics;
80         JournalMetrics system_metrics;
81
82         bool compress;
83
84         uint64_t cached_available_space;
85         usec_t cached_available_space_timestamp;
86
87         uint64_t var_available_timestamp;
88
89         LIST_HEAD(StdoutStream, stdout_streams);
90         unsigned n_stdout_streams;
91 } Server;
92
93 typedef enum StdoutStreamState {
94         STDOUT_STREAM_TAG,
95         STDOUT_STREAM_PRIORITY,
96         STDOUT_STREAM_PRIORITY_PREFIX,
97         STDOUT_STREAM_TEE_CONSOLE,
98         STDOUT_STREAM_RUNNING
99 } StdoutStreamState;
100
101 struct StdoutStream {
102         Server *server;
103         StdoutStreamState state;
104
105         int fd;
106
107         struct ucred ucred;
108
109         char *tag;
110         int priority;
111         bool priority_prefix:1;
112         bool tee_console:1;
113
114         char buffer[LINE_MAX+1];
115         size_t length;
116
117         LIST_FIELDS(StdoutStream, stdout_stream);
118 };
119
120 static int server_flush_to_var(Server *s);
121
122 static uint64_t available_space(Server *s) {
123         char ids[33], *p;
124         const char *f;
125         sd_id128_t machine;
126         struct statvfs ss;
127         uint64_t sum = 0, avail = 0, ss_avail = 0;
128         int r;
129         DIR *d;
130         usec_t ts;
131         JournalMetrics *m;
132
133         ts = now(CLOCK_MONOTONIC);
134
135         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
136                 return s->cached_available_space;
137
138         r = sd_id128_get_machine(&machine);
139         if (r < 0)
140                 return 0;
141
142         if (s->system_journal) {
143                 f = "/var/log/journal/";
144                 m = &s->system_metrics;
145         } else {
146                 f = "/run/log/journal/";
147                 m = &s->runtime_metrics;
148         }
149
150         assert(m);
151
152         p = strappend(f, sd_id128_to_string(machine, ids));
153         if (!p)
154                 return 0;
155
156         d = opendir(p);
157         free(p);
158
159         if (!d)
160                 return 0;
161
162         if (fstatvfs(dirfd(d), &ss) < 0)
163                 goto finish;
164
165         for (;;) {
166                 struct stat st;
167                 struct dirent buf, *de;
168                 int k;
169
170                 k = readdir_r(d, &buf, &de);
171                 if (k != 0) {
172                         r = -k;
173                         goto finish;
174                 }
175
176                 if (!de)
177                         break;
178
179                 if (!dirent_is_file_with_suffix(de, ".journal"))
180                         continue;
181
182                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
183                         continue;
184
185                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
186         }
187
188         avail = sum >= m->max_use ? 0 : m->max_use - sum;
189
190         ss_avail = ss.f_bsize * ss.f_bavail;
191
192         ss_avail = ss_avail < m->keep_free ? 0 : ss_avail - m->keep_free;
193
194         if (ss_avail < avail)
195                 avail = ss_avail;
196
197         s->cached_available_space = avail;
198         s->cached_available_space_timestamp = ts;
199
200 finish:
201         closedir(d);
202
203         return avail;
204 }
205
206 static void fix_perms(JournalFile *f, uid_t uid) {
207         acl_t acl;
208         acl_entry_t entry;
209         acl_permset_t permset;
210         int r;
211
212         assert(f);
213
214         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
215         if (r < 0)
216                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
217
218         if (uid <= 0)
219                 return;
220
221         acl = acl_get_fd(f->fd);
222         if (!acl) {
223                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
224                 return;
225         }
226
227         r = acl_find_uid(acl, uid, &entry);
228         if (r <= 0) {
229
230                 if (acl_create_entry(&acl, &entry) < 0 ||
231                     acl_set_tag_type(entry, ACL_USER) < 0 ||
232                     acl_set_qualifier(entry, &uid) < 0) {
233                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
234                         goto finish;
235                 }
236         }
237
238         if (acl_get_permset(entry, &permset) < 0 ||
239             acl_add_perm(permset, ACL_READ) < 0 ||
240             acl_calc_mask(&acl) < 0) {
241                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
242                 goto finish;
243         }
244
245         if (acl_set_fd(f->fd, acl) < 0)
246                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
247
248 finish:
249         acl_free(acl);
250 }
251
252 static JournalFile* find_journal(Server *s, uid_t uid) {
253         char *p;
254         int r;
255         JournalFile *f;
256         char ids[33];
257         sd_id128_t machine;
258
259         assert(s);
260
261         /* We split up user logs only on /var, not on /run. If the
262          * runtime file is open, we write to it exclusively, in order
263          * to guarantee proper order as soon as we flush /run to
264          * /var and close the runtime file. */
265
266         if (s->runtime_journal)
267                 return s->runtime_journal;
268
269         if (uid <= 0)
270                 return s->system_journal;
271
272         r = sd_id128_get_machine(&machine);
273         if (r < 0)
274                 return s->system_journal;
275
276         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
277         if (f)
278                 return f;
279
280         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
281                 return s->system_journal;
282
283         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
284                 /* Too many open? Then let's close one */
285                 f = hashmap_steal_first(s->user_journals);
286                 assert(f);
287                 journal_file_close(f);
288         }
289
290         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
291         free(p);
292
293         if (r < 0)
294                 return s->system_journal;
295
296         fix_perms(f, uid);
297         f->metrics = s->system_metrics;
298         f->compress = s->compress;
299
300         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
301         if (r < 0) {
302                 journal_file_close(f);
303                 return s->system_journal;
304         }
305
306         return f;
307 }
308
309 static void server_rotate(Server *s) {
310         JournalFile *f;
311         void *k;
312         Iterator i;
313         int r;
314
315         log_info("Rotating...");
316
317         if (s->runtime_journal) {
318                 r = journal_file_rotate(&s->runtime_journal);
319                 if (r < 0)
320                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
321         }
322
323         if (s->system_journal) {
324                 r = journal_file_rotate(&s->system_journal);
325                 if (r < 0)
326                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
327         }
328
329         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
330                 r = journal_file_rotate(&f);
331                 if (r < 0)
332                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
333                 else
334                         hashmap_replace(s->user_journals, k, f);
335         }
336 }
337
338 static void server_vacuum(Server *s) {
339         char *p;
340         char ids[33];
341         sd_id128_t machine;
342         int r;
343
344         log_info("Vacuuming...");
345
346         r = sd_id128_get_machine(&machine);
347         if (r < 0) {
348                 log_error("Failed to get machine ID: %s", strerror(-r));
349                 return;
350         }
351
352         sd_id128_to_string(machine, ids);
353
354         if (s->system_journal) {
355                 if (asprintf(&p, "/var/log/journal/%s", ids) < 0) {
356                         log_error("Out of memory.");
357                         return;
358                 }
359
360                 r = journal_directory_vacuum(p, s->system_metrics.max_use, s->system_metrics.keep_free);
361                 if (r < 0 && r != -ENOENT)
362                         log_error("Failed to vacuum %s: %s", p, strerror(-r));
363                 free(p);
364         }
365
366
367         if (s->runtime_journal) {
368                 if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
369                         log_error("Out of memory.");
370                         return;
371                 }
372
373                 r = journal_directory_vacuum(p, s->runtime_metrics.max_use, s->runtime_metrics.keep_free);
374                 if (r < 0 && r != -ENOENT)
375                         log_error("Failed to vacuum %s: %s", p, strerror(-r));
376                 free(p);
377         }
378
379         s->cached_available_space_timestamp = 0;
380 }
381
382 static char *shortened_cgroup_path(pid_t pid) {
383         int r;
384         char *process_path, *init_path, *path;
385
386         assert(pid > 0);
387
388         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
389         if (r < 0)
390                 return NULL;
391
392         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
393         if (r < 0) {
394                 free(process_path);
395                 return NULL;
396         }
397
398         if (endswith(init_path, "/system"))
399                 init_path[strlen(init_path) - 7] = 0;
400         else if (streq(init_path, "/"))
401                 init_path[0] = 0;
402
403         if (startswith(process_path, init_path)) {
404                 char *p;
405
406                 p = strdup(process_path + strlen(init_path));
407                 if (!p) {
408                         free(process_path);
409                         free(init_path);
410                         return NULL;
411                 }
412                 path = p;
413         } else {
414                 path = process_path;
415                 process_path = NULL;
416         }
417
418         free(process_path);
419         free(init_path);
420
421         return path;
422 }
423
424 static void dispatch_message_real(Server *s,
425                              struct iovec *iovec, unsigned n, unsigned m,
426                              struct ucred *ucred,
427                              struct timeval *tv) {
428
429         char *pid = NULL, *uid = NULL, *gid = NULL,
430                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
431                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
432                 *audit_session = NULL, *audit_loginuid = NULL,
433                 *exe = NULL, *cgroup = NULL, *session = NULL,
434                 *owner_uid = NULL, *service = NULL;
435
436         char idbuf[33];
437         sd_id128_t id;
438         int r;
439         char *t;
440         uid_t loginuid = 0, realuid = 0;
441         JournalFile *f;
442         bool vacuumed = false;
443
444         assert(s);
445         assert(iovec);
446         assert(n > 0);
447         assert(n + 16 <= m);
448
449         if (ucred) {
450                 uint32_t audit;
451                 uid_t owner;
452
453                 realuid = ucred->uid;
454
455                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
456                         IOVEC_SET_STRING(iovec[n++], pid);
457
458                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
459                         IOVEC_SET_STRING(iovec[n++], uid);
460
461                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
462                         IOVEC_SET_STRING(iovec[n++], gid);
463
464                 r = get_process_comm(ucred->pid, &t);
465                 if (r >= 0) {
466                         comm = strappend("_COMM=", t);
467                         free(t);
468
469                         if (comm)
470                                 IOVEC_SET_STRING(iovec[n++], comm);
471                 }
472
473                 r = get_process_exe(ucred->pid, &t);
474                 if (r >= 0) {
475                         exe = strappend("_EXE=", t);
476                         free(t);
477
478                         if (comm)
479                                 IOVEC_SET_STRING(iovec[n++], exe);
480                 }
481
482                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
483                 if (r >= 0) {
484                         cmdline = strappend("_CMDLINE=", t);
485                         free(t);
486
487                         if (cmdline)
488                                 IOVEC_SET_STRING(iovec[n++], cmdline);
489                 }
490
491                 r = audit_session_from_pid(ucred->pid, &audit);
492                 if (r >= 0)
493                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) audit) >= 0)
494                                 IOVEC_SET_STRING(iovec[n++], audit_session);
495
496                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
497                 if (r >= 0)
498                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
499                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
500
501                 t = shortened_cgroup_path(ucred->pid);
502                 if (t) {
503                         cgroup = strappend("_SYSTEMD_CGROUP=", t);
504                         free(t);
505
506                         if (cgroup)
507                                 IOVEC_SET_STRING(iovec[n++], cgroup);
508                 }
509
510                 if (sd_pid_get_session(ucred->pid, &t) >= 0) {
511                         session = strappend("_SYSTEMD_SESSION=", t);
512                         free(t);
513
514                         if (session)
515                                 IOVEC_SET_STRING(iovec[n++], session);
516                 }
517
518                 if (sd_pid_get_service(ucred->pid, &t) >= 0) {
519                         service = strappend("_SYSTEMD_SERVICE=", t);
520                         free(t);
521
522                         if (service)
523                                 IOVEC_SET_STRING(iovec[n++], service);
524                 }
525
526                 if (sd_pid_get_owner_uid(ucred->uid, &owner) >= 0)
527                         if (asprintf(&owner_uid, "_SYSTEMD_OWNER_UID=%lu", (unsigned long) owner) >= 0)
528                                 IOVEC_SET_STRING(iovec[n++], owner_uid);
529         }
530
531         if (tv) {
532                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
533                              (unsigned long long) timeval_load(tv)) >= 0)
534                         IOVEC_SET_STRING(iovec[n++], source_time);
535         }
536
537         /* Note that strictly speaking storing the boot id here is
538          * redundant since the entry includes this in-line
539          * anyway. However, we need this indexed, too. */
540         r = sd_id128_get_boot(&id);
541         if (r >= 0)
542                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
543                         IOVEC_SET_STRING(iovec[n++], boot_id);
544
545         r = sd_id128_get_machine(&id);
546         if (r >= 0)
547                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
548                         IOVEC_SET_STRING(iovec[n++], machine_id);
549
550         t = gethostname_malloc();
551         if (t) {
552                 hostname = strappend("_HOSTNAME=", t);
553                 free(t);
554                 if (hostname)
555                         IOVEC_SET_STRING(iovec[n++], hostname);
556         }
557
558         assert(n <= m);
559
560         server_flush_to_var(s);
561
562 retry:
563         f = find_journal(s, realuid == 0 ? 0 : loginuid);
564         if (!f)
565                 log_warning("Dropping message, as we can't find a place to store the data.");
566         else {
567                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
568
569                 if (r == -E2BIG && !vacuumed) {
570                         log_info("Allocation limit reached.");
571
572                         server_rotate(s);
573                         server_vacuum(s);
574                         vacuumed = true;
575
576                         log_info("Retrying write.");
577                         goto retry;
578                 }
579
580                 if (r < 0)
581                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
582         }
583
584         free(pid);
585         free(uid);
586         free(gid);
587         free(comm);
588         free(exe);
589         free(cmdline);
590         free(source_time);
591         free(boot_id);
592         free(machine_id);
593         free(hostname);
594         free(audit_session);
595         free(audit_loginuid);
596         free(cgroup);
597         free(session);
598         free(owner_uid);
599         free(service);
600 }
601
602 static void dispatch_message(Server *s,
603                              struct iovec *iovec, unsigned n, unsigned m,
604                              struct ucred *ucred,
605                              struct timeval *tv,
606                              int priority) {
607         int rl;
608         char *path = NULL, *c;
609
610         assert(s);
611         assert(iovec || n == 0);
612
613         if (n == 0)
614                 return;
615
616         if (!ucred)
617                 goto finish;
618
619         path = shortened_cgroup_path(ucred->pid);
620         if (!path)
621                 goto finish;
622
623         /* example: /user/lennart/3/foobar
624          *          /system/dbus.service/foobar
625          *
626          * So let's cut of everything past the third /, since that is
627          * wher user directories start */
628
629         c = strchr(path, '/');
630         if (c) {
631                 c = strchr(c+1, '/');
632                 if (c) {
633                         c = strchr(c+1, '/');
634                         if (c)
635                                 *c = 0;
636                 }
637         }
638
639         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
640
641         if (rl == 0) {
642                 free(path);
643                 return;
644         }
645
646         if (rl > 1) {
647                 int j = 0;
648                 char suppress_message[LINE_MAX];
649                 struct iovec suppress_iovec[18];
650
651                 /* Write a suppression message if we suppressed something */
652
653                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
654                 char_array_0(suppress_message);
655
656                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
657                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
658
659                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
660         }
661
662         free(path);
663
664 finish:
665         dispatch_message_real(s, iovec, n, m, ucred, tv);
666 }
667
668 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
669         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
670         struct iovec iovec[19];
671         unsigned n = 0;
672         int priority = LOG_USER | LOG_INFO;
673
674         assert(s);
675         assert(buf);
676
677         parse_syslog_priority((char**) &buf, &priority);
678         skip_syslog_date((char**) &buf);
679
680         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
681                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
682
683         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
684                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
685
686         message = strappend("MESSAGE=", buf);
687         if (message)
688                 IOVEC_SET_STRING(iovec[n++], message);
689
690         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
691
692         free(message);
693         free(syslog_facility);
694         free(syslog_priority);
695 }
696
697 static bool valid_user_field(const char *p, size_t l) {
698         const char *a;
699
700         /* We kinda enforce POSIX syntax recommendations for
701            environment variables here, but make a couple of additional
702            requirements.
703
704            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
705
706         /* No empty field names */
707         if (l <= 0)
708                 return false;
709
710         /* Don't allow names longer than 64 chars */
711         if (l > 64)
712                 return false;
713
714         /* Variables starting with an underscore are protected */
715         if (p[0] == '_')
716                 return false;
717
718         /* Don't allow digits as first character */
719         if (p[0] >= '0' && p[0] <= '9')
720                 return false;
721
722         /* Only allow A-Z0-9 and '_' */
723         for (a = p; a < p + l; a++)
724                 if (!((*a >= 'A' && *a <= 'Z') ||
725                       (*a >= '0' && *a <= '9') ||
726                       *a == '_'))
727                         return false;
728
729         return true;
730 }
731
732 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
733         struct iovec *iovec = NULL;
734         unsigned n = 0, m = 0, j;
735         const char *p;
736         size_t remaining;
737         int priority = LOG_INFO;
738
739         assert(s);
740         assert(buffer || n == 0);
741
742         p = buffer;
743         remaining = buffer_size;
744
745         while (remaining > 0) {
746                 const char *e, *q;
747
748                 e = memchr(p, '\n', remaining);
749
750                 if (!e) {
751                         /* Trailing noise, let's ignore it, and flush what we collected */
752                         log_debug("Received message with trailing noise, ignoring.");
753                         break;
754                 }
755
756                 if (e == p) {
757                         /* Entry separator */
758                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
759                         n = 0;
760                         priority = LOG_INFO;
761
762                         p++;
763                         remaining--;
764                         continue;
765                 }
766
767                 if (*p == '.' || *p == '#') {
768                         /* Ignore control commands for now, and
769                          * comments too. */
770                         remaining -= (e - p) + 1;
771                         p = e + 1;
772                         continue;
773                 }
774
775                 /* A property follows */
776
777                 if (n+16 >= m) {
778                         struct iovec *c;
779                         unsigned u;
780
781                         u = MAX((n+16U) * 2U, 4U);
782                         c = realloc(iovec, u * sizeof(struct iovec));
783                         if (!c) {
784                                 log_error("Out of memory");
785                                 break;
786                         }
787
788                         iovec = c;
789                         m = u;
790                 }
791
792                 q = memchr(p, '=', e - p);
793                 if (q) {
794                         if (valid_user_field(p, q - p)) {
795                                 /* If the field name starts with an
796                                  * underscore, skip the variable,
797                                  * since that indidates a trusted
798                                  * field */
799                                 iovec[n].iov_base = (char*) p;
800                                 iovec[n].iov_len = e - p;
801                                 n++;
802
803                                 /* We need to determine the priority
804                                  * of this entry for the rate limiting
805                                  * logic */
806                                 if (e - p == 10 &&
807                                     memcmp(p, "PRIORITY=", 10) == 0 &&
808                                     p[10] >= '0' &&
809                                     p[10] <= '9')
810                                         priority = p[10] - '0';
811                         }
812
813                         remaining -= (e - p) + 1;
814                         p = e + 1;
815                         continue;
816                 } else {
817                         uint64_t l;
818                         char *k;
819
820                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
821                                 log_debug("Failed to parse message, ignoring.");
822                                 break;
823                         }
824
825                         memcpy(&l, e + 1, sizeof(uint64_t));
826                         l = le64toh(l);
827
828                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
829                             e[1+sizeof(uint64_t)+l] != '\n') {
830                                 log_debug("Failed to parse message, ignoring.");
831                                 break;
832                         }
833
834                         k = malloc((e - p) + 1 + l);
835                         if (!k) {
836                                 log_error("Out of memory");
837                                 break;
838                         }
839
840                         memcpy(k, p, e - p);
841                         k[e - p] = '=';
842                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
843
844                         if (valid_user_field(p, e - p)) {
845                                 iovec[n].iov_base = k;
846                                 iovec[n].iov_len = (e - p) + 1 + l;
847                                 n++;
848                         } else
849                                 free(k);
850
851                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
852                         p = e + 1 + sizeof(uint64_t) + l + 1;
853                 }
854         }
855
856         dispatch_message(s, iovec, n, m, ucred, tv, priority);
857
858         for (j = 0; j < n; j++)
859                 if (iovec[j].iov_base < buffer ||
860                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
861                         free(iovec[j].iov_base);
862 }
863
864 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
865         struct iovec iovec[18];
866         char *message = NULL, *syslog_priority = NULL;
867         unsigned n = 0;
868         size_t tag_len;
869         int priority;
870
871         assert(s);
872         assert(p);
873
874         priority = s->priority;
875
876         if (s->priority_prefix &&
877             l > 3 &&
878             p[0] == '<' &&
879             p[1] >= '0' && p[1] <= '7' &&
880             p[2] == '>') {
881
882                 priority = p[1] - '0';
883                 p += 3;
884                 l -= 3;
885         }
886
887         if (l <= 0)
888                 return 0;
889
890         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
891                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
892
893         tag_len = s->tag ? strlen(s->tag) + 2: 0;
894         message = malloc(8 + tag_len + l);
895         if (message) {
896                 memcpy(message, "MESSAGE=", 8);
897
898                 if (s->tag) {
899                         memcpy(message+8, s->tag, tag_len-2);
900                         memcpy(message+8+tag_len-2, ": ", 2);
901                 }
902
903                 memcpy(message+8+tag_len, p, l);
904                 iovec[n].iov_base = message;
905                 iovec[n].iov_len = 8+tag_len+l;
906                 n++;
907         }
908
909         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
910
911         if (s->tee_console) {
912                 int console;
913
914                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
915                 if (console >= 0) {
916                         n = 0;
917                         if (s->tag) {
918                                 IOVEC_SET_STRING(iovec[n++], s->tag);
919                                 IOVEC_SET_STRING(iovec[n++], ": ");
920                         }
921
922                         iovec[n].iov_base = (void*) p;
923                         iovec[n].iov_len = l;
924                         n++;
925
926                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
927
928                         writev(console, iovec, n);
929                 }
930         }
931
932         free(message);
933         free(syslog_priority);
934
935         return 0;
936 }
937
938 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
939         assert(s);
940         assert(p);
941
942         while (l > 0 && strchr(WHITESPACE, *p)) {
943                 l--;
944                 p++;
945         }
946
947         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
948                 l--;
949
950         switch (s->state) {
951
952         case STDOUT_STREAM_TAG:
953
954                 if (l > 0) {
955                         s->tag = strndup(p, l);
956                         if (!s->tag) {
957                                 log_error("Out of memory");
958                                 return -EINVAL;
959                         }
960                 }
961
962                 s->state = STDOUT_STREAM_PRIORITY;
963                 return 0;
964
965         case STDOUT_STREAM_PRIORITY:
966                 if (l != 1 || *p < '0' || *p > '7') {
967                         log_warning("Failed to parse log priority line.");
968                         return -EINVAL;
969                 }
970
971                 s->priority = *p - '0';
972                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
973                 return 0;
974
975         case STDOUT_STREAM_PRIORITY_PREFIX:
976                 if (l != 1 || *p < '0' || *p > '1') {
977                         log_warning("Failed to parse priority prefix line.");
978                         return -EINVAL;
979                 }
980
981                 s->priority_prefix = *p - '0';
982                 s->state = STDOUT_STREAM_TEE_CONSOLE;
983                 return 0;
984
985         case STDOUT_STREAM_TEE_CONSOLE:
986                 if (l != 1 || *p < '0' || *p > '1') {
987                         log_warning("Failed to parse tee to console line.");
988                         return -EINVAL;
989                 }
990
991                 s->tee_console = *p - '0';
992                 s->state = STDOUT_STREAM_RUNNING;
993                 return 0;
994
995         case STDOUT_STREAM_RUNNING:
996                 return stdout_stream_log(s, p, l);
997         }
998
999         assert_not_reached("Unknown stream state");
1000 }
1001
1002 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
1003         char *p;
1004         size_t remaining;
1005         int r;
1006
1007         assert(s);
1008
1009         p = s->buffer;
1010         remaining = s->length;
1011         for (;;) {
1012                 char *end;
1013                 size_t skip;
1014
1015                 end = memchr(p, '\n', remaining);
1016                 if (!end) {
1017                         if (remaining >= LINE_MAX) {
1018                                 end = p + LINE_MAX;
1019                                 skip = LINE_MAX;
1020                         } else
1021                                 break;
1022                 } else
1023                         skip = end - p + 1;
1024
1025                 r = stdout_stream_line(s, p, end - p);
1026                 if (r < 0)
1027                         return r;
1028
1029                 remaining -= skip;
1030                 p += skip;
1031         }
1032
1033         if (force_flush && remaining > 0) {
1034                 r = stdout_stream_line(s, p, remaining);
1035                 if (r < 0)
1036                         return r;
1037
1038                 p += remaining;
1039                 remaining = 0;
1040         }
1041
1042         if (p > s->buffer) {
1043                 memmove(s->buffer, p, remaining);
1044                 s->length = remaining;
1045         }
1046
1047         return 0;
1048 }
1049
1050 static int stdout_stream_process(StdoutStream *s) {
1051         ssize_t l;
1052         int r;
1053
1054         assert(s);
1055
1056         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
1057         if (l < 0) {
1058
1059                 if (errno == EAGAIN)
1060                         return 0;
1061
1062                 log_warning("Failed to read from stream: %m");
1063                 return -errno;
1064         }
1065
1066         if (l == 0) {
1067                 r = stdout_stream_scan(s, true);
1068                 if (r < 0)
1069                         return r;
1070
1071                 return 0;
1072         }
1073
1074         s->length += l;
1075         r = stdout_stream_scan(s, false);
1076         if (r < 0)
1077                 return r;
1078
1079         return 1;
1080
1081 }
1082
1083 static void stdout_stream_free(StdoutStream *s) {
1084         assert(s);
1085
1086         if (s->server) {
1087                 assert(s->server->n_stdout_streams > 0);
1088                 s->server->n_stdout_streams --;
1089                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1090         }
1091
1092         if (s->fd >= 0) {
1093                 if (s->server)
1094                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1095
1096                 close_nointr_nofail(s->fd);
1097         }
1098
1099         free(s->tag);
1100         free(s);
1101 }
1102
1103 static int stdout_stream_new(Server *s) {
1104         StdoutStream *stream;
1105         int fd, r;
1106         socklen_t len;
1107         struct epoll_event ev;
1108
1109         assert(s);
1110
1111         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1112         if (fd < 0) {
1113                 if (errno == EAGAIN)
1114                         return 0;
1115
1116                 log_error("Failed to accept stdout connection: %m");
1117                 return -errno;
1118         }
1119
1120         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1121                 log_warning("Too many stdout streams, refusing connection.");
1122                 close_nointr_nofail(fd);
1123                 return 0;
1124         }
1125
1126         stream = new0(StdoutStream, 1);
1127         if (!stream) {
1128                 log_error("Out of memory.");
1129                 close_nointr_nofail(fd);
1130                 return -ENOMEM;
1131         }
1132
1133         stream->fd = fd;
1134
1135         len = sizeof(stream->ucred);
1136         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1137                 log_error("Failed to determine peer credentials: %m");
1138                 r = -errno;
1139                 goto fail;
1140         }
1141
1142         if (shutdown(fd, SHUT_WR) < 0) {
1143                 log_error("Failed to shutdown writing side of socket: %m");
1144                 r = -errno;
1145                 goto fail;
1146         }
1147
1148         zero(ev);
1149         ev.data.ptr = stream;
1150         ev.events = EPOLLIN;
1151         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1152                 log_error("Failed to add stream to event loop: %m");
1153                 r = -errno;
1154                 goto fail;
1155         }
1156
1157         stream->server = s;
1158         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1159         s->n_stdout_streams ++;
1160
1161         return 0;
1162
1163 fail:
1164         stdout_stream_free(stream);
1165         return r;
1166 }
1167
1168 static int system_journal_open(Server *s) {
1169         int r;
1170         char *fn;
1171         sd_id128_t machine;
1172         char ids[33];
1173
1174         r = sd_id128_get_machine(&machine);
1175         if (r < 0)
1176                 return r;
1177
1178         sd_id128_to_string(machine, ids);
1179
1180         if (!s->system_journal) {
1181
1182                 /* First try to create the machine path, but not the prefix */
1183                 fn = strappend("/var/log/journal/", ids);
1184                 if (!fn)
1185                         return -ENOMEM;
1186                 (void) mkdir(fn, 0755);
1187                 free(fn);
1188
1189                 /* The create the system journal file */
1190                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1191                 if (!fn)
1192                         return -ENOMEM;
1193
1194                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1195                 free(fn);
1196
1197                 if (r >= 0) {
1198                         journal_default_metrics(&s->system_metrics, s->system_journal->fd);
1199
1200                         s->system_journal->metrics = s->system_metrics;
1201                         s->system_journal->compress = s->compress;
1202
1203                         fix_perms(s->system_journal, 0);
1204                 } else if (r < 0) {
1205
1206                         if (r != -ENOENT && r != -EROFS)
1207                                 log_warning("Failed to open system journal: %s", strerror(-r));
1208
1209                         r = 0;
1210                 }
1211         }
1212
1213         if (!s->runtime_journal) {
1214
1215                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1216                 if (!fn)
1217                         return -ENOMEM;
1218
1219                 if (s->system_journal) {
1220
1221                         /* Try to open the runtime journal, but only
1222                          * if it already exists, so that we can flush
1223                          * it into the system journal */
1224
1225                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1226                         free(fn);
1227
1228                         if (r < 0) {
1229                                 if (r != -ENOENT)
1230                                         log_warning("Failed to open runtime journal: %s", strerror(-r));
1231
1232                                 r = 0;
1233                         }
1234
1235                 } else {
1236
1237                         /* OK, we really need the runtime journal, so create
1238                          * it if necessary. */
1239
1240                         (void) mkdir_parents(fn, 0755);
1241                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1242                         free(fn);
1243
1244                         if (r < 0) {
1245                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1246                                 return r;
1247                         }
1248                 }
1249
1250                 if (s->runtime_journal) {
1251                         journal_default_metrics(&s->runtime_metrics, s->runtime_journal->fd);
1252
1253                         s->runtime_journal->metrics = s->runtime_metrics;
1254                         s->runtime_journal->compress = s->compress;
1255
1256                         fix_perms(s->runtime_journal, 0);
1257                 }
1258         }
1259
1260         return r;
1261 }
1262
1263 static int server_flush_to_var(Server *s) {
1264         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1265         Object *o = NULL;
1266         int r;
1267         sd_id128_t machine;
1268         sd_journal *j;
1269         usec_t ts;
1270
1271         assert(s);
1272
1273         if (!s->runtime_journal)
1274                 return 0;
1275
1276         ts = now(CLOCK_MONOTONIC);
1277         if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1278                 return 0;
1279
1280         s->var_available_timestamp = ts;
1281
1282         system_journal_open(s);
1283
1284         if (!s->system_journal)
1285                 return 0;
1286
1287         r = sd_id128_get_machine(&machine);
1288         if (r < 0) {
1289                 log_error("Failed to get machine id: %s", strerror(-r));
1290                 return r;
1291         }
1292
1293         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1294         if (r < 0) {
1295                 log_error("Failed to read runtime journal: %s", strerror(-r));
1296                 return r;
1297         }
1298
1299         SD_JOURNAL_FOREACH(j) {
1300                 JournalFile *f;
1301
1302                 f = j->current_file;
1303                 assert(f && f->current_offset > 0);
1304
1305                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1306                 if (r < 0) {
1307                         log_error("Can't read entry: %s", strerror(-r));
1308                         goto finish;
1309                 }
1310
1311                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1312                 if (r == -E2BIG) {
1313                         log_info("Allocation limit reached.");
1314
1315                         journal_file_post_change(s->system_journal);
1316                         server_rotate(s);
1317                         server_vacuum(s);
1318
1319                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1320                 }
1321
1322                 if (r < 0) {
1323                         log_error("Can't write entry: %s", strerror(-r));
1324                         goto finish;
1325                 }
1326         }
1327
1328 finish:
1329         journal_file_post_change(s->system_journal);
1330
1331         journal_file_close(s->runtime_journal);
1332         s->runtime_journal = NULL;
1333
1334         if (r >= 0) {
1335                 sd_id128_to_string(machine, path + 17);
1336                 rm_rf(path, false, true, false);
1337         }
1338
1339         return r;
1340 }
1341
1342 static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1343         struct msghdr msghdr;
1344         struct iovec iovec;
1345         struct cmsghdr *cmsg;
1346         union {
1347                 struct cmsghdr cmsghdr;
1348                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1349                             CMSG_SPACE(sizeof(struct timeval))];
1350         } control;
1351         union sockaddr_union sa;
1352
1353         assert(s);
1354
1355         zero(msghdr);
1356
1357         zero(iovec);
1358         iovec.iov_base = (void*) buffer;
1359         iovec.iov_len = length;
1360         msghdr.msg_iov = &iovec;
1361         msghdr.msg_iovlen = 1;
1362
1363         zero(sa);
1364         sa.un.sun_family = AF_UNIX;
1365         strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1366         msghdr.msg_name = &sa;
1367         msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1368
1369         zero(control);
1370         msghdr.msg_control = &control;
1371         msghdr.msg_controllen = sizeof(control);
1372
1373         cmsg = CMSG_FIRSTHDR(&msghdr);
1374         cmsg->cmsg_level = SOL_SOCKET;
1375         cmsg->cmsg_type = SCM_CREDENTIALS;
1376         cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1377         memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1378         msghdr.msg_controllen = cmsg->cmsg_len;
1379
1380         /* Forward the syslog message we received via /dev/log to
1381          * /run/systemd/syslog. Unfortunately we currently can't set
1382          * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1383
1384         if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1385                 return;
1386
1387         if (errno == ESRCH) {
1388                 struct ucred u;
1389
1390                 /* Hmm, presumably the sender process vanished
1391                  * by now, so let's fix it as good as we
1392                  * can, and retry */
1393
1394                 u = *ucred;
1395                 u.pid = getpid();
1396                 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1397
1398                 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1399                         return;
1400         }
1401
1402         log_debug("Failed to forward syslog message: %m");
1403 }
1404
1405 static int process_event(Server *s, struct epoll_event *ev) {
1406         assert(s);
1407
1408         if (ev->data.fd == s->signal_fd) {
1409                 struct signalfd_siginfo sfsi;
1410                 ssize_t n;
1411
1412                 if (ev->events != EPOLLIN) {
1413                         log_info("Got invalid event from epoll.");
1414                         return -EIO;
1415                 }
1416
1417                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1418                 if (n != sizeof(sfsi)) {
1419
1420                         if (n >= 0)
1421                                 return -EIO;
1422
1423                         if (errno == EINTR || errno == EAGAIN)
1424                                 return 0;
1425
1426                         return -errno;
1427                 }
1428
1429                 if (sfsi.ssi_signo == SIGUSR1) {
1430                         server_flush_to_var(s);
1431                         return 0;
1432                 }
1433
1434                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1435                 return 0;
1436
1437         } else if (ev->data.fd == s->native_fd ||
1438                    ev->data.fd == s->syslog_fd) {
1439
1440                 if (ev->events != EPOLLIN) {
1441                         log_info("Got invalid event from epoll.");
1442                         return -EIO;
1443                 }
1444
1445                 for (;;) {
1446                         struct msghdr msghdr;
1447                         struct iovec iovec;
1448                         struct ucred *ucred = NULL;
1449                         struct timeval *tv = NULL;
1450                         struct cmsghdr *cmsg;
1451                         union {
1452                                 struct cmsghdr cmsghdr;
1453                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1454                                             CMSG_SPACE(sizeof(struct timeval))];
1455                         } control;
1456                         ssize_t n;
1457                         int v;
1458
1459                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1460                                 log_error("SIOCINQ failed: %m");
1461                                 return -errno;
1462                         }
1463
1464                         if (v <= 0)
1465                                 return 1;
1466
1467                         if (s->buffer_size < (size_t) v) {
1468                                 void *b;
1469                                 size_t l;
1470
1471                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1472                                 b = realloc(s->buffer, l+1);
1473
1474                                 if (!b) {
1475                                         log_error("Couldn't increase buffer.");
1476                                         return -ENOMEM;
1477                                 }
1478
1479                                 s->buffer_size = l;
1480                                 s->buffer = b;
1481                         }
1482
1483                         zero(iovec);
1484                         iovec.iov_base = s->buffer;
1485                         iovec.iov_len = s->buffer_size;
1486
1487                         zero(control);
1488                         zero(msghdr);
1489                         msghdr.msg_iov = &iovec;
1490                         msghdr.msg_iovlen = 1;
1491                         msghdr.msg_control = &control;
1492                         msghdr.msg_controllen = sizeof(control);
1493
1494                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1495                         if (n < 0) {
1496
1497                                 if (errno == EINTR || errno == EAGAIN)
1498                                         return 1;
1499
1500                                 log_error("recvmsg() failed: %m");
1501                                 return -errno;
1502                         }
1503
1504                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1505
1506                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1507                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1508                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1509                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1510                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1511                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1512                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1513                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1514                         }
1515
1516                         if (ev->data.fd == s->syslog_fd) {
1517                                 char *e;
1518
1519                                 e = memchr(s->buffer, '\n', n);
1520                                 if (e)
1521                                         *e = 0;
1522                                 else
1523                                         s->buffer[n] = 0;
1524
1525                                 forward_syslog(s, s->buffer, n, ucred, tv);
1526                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1527                         } else
1528                                 process_native_message(s, s->buffer, n, ucred, tv);
1529                 }
1530
1531                 return 1;
1532
1533         } else if (ev->data.fd == s->stdout_fd) {
1534
1535                 if (ev->events != EPOLLIN) {
1536                         log_info("Got invalid event from epoll.");
1537                         return -EIO;
1538                 }
1539
1540                 stdout_stream_new(s);
1541                 return 1;
1542
1543         } else {
1544                 StdoutStream *stream;
1545
1546                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1547                         log_info("Got invalid event from epoll.");
1548                         return -EIO;
1549                 }
1550
1551                 /* If it is none of the well-known fds, it must be an
1552                  * stdout stream fd. Note that this is a bit ugly here
1553                  * (since we rely that none of the well-known fds
1554                  * could be interpreted as pointer), but nonetheless
1555                  * safe, since the well-known fds would never get an
1556                  * fd > 4096, i.e. beyond the first memory page */
1557
1558                 stream = ev->data.ptr;
1559
1560                 if (stdout_stream_process(stream) <= 0)
1561                         stdout_stream_free(stream);
1562
1563                 return 1;
1564         }
1565
1566         log_error("Unknown event.");
1567         return 0;
1568 }
1569
1570 static int open_syslog_socket(Server *s) {
1571         union sockaddr_union sa;
1572         int one, r;
1573         struct epoll_event ev;
1574         struct timeval tv;
1575
1576         assert(s);
1577
1578         if (s->syslog_fd < 0) {
1579
1580                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1581                 if (s->syslog_fd < 0) {
1582                         log_error("socket() failed: %m");
1583                         return -errno;
1584                 }
1585
1586                 zero(sa);
1587                 sa.un.sun_family = AF_UNIX;
1588                 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
1589
1590                 unlink(sa.un.sun_path);
1591
1592                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1593                 if (r < 0) {
1594                         log_error("bind() failed: %m");
1595                         return -errno;
1596                 }
1597
1598                 chmod(sa.un.sun_path, 0666);
1599         }
1600
1601         one = 1;
1602         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1603         if (r < 0) {
1604                 log_error("SO_PASSCRED failed: %m");
1605                 return -errno;
1606         }
1607
1608         one = 1;
1609         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1610         if (r < 0) {
1611                 log_error("SO_TIMESTAMP failed: %m");
1612                 return -errno;
1613         }
1614
1615         /* Since we use the same socket for forwarding this to some
1616          * other syslog implementation, make sure we don't hang
1617          * forever */
1618         timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1619         if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1620                 log_error("SO_SNDTIMEO failed: %m");
1621                 return -errno;
1622         }
1623
1624         zero(ev);
1625         ev.events = EPOLLIN;
1626         ev.data.fd = s->syslog_fd;
1627         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1628                 log_error("Failed to add syslog server fd to epoll object: %m");
1629                 return -errno;
1630         }
1631
1632         return 0;
1633 }
1634
1635 static int open_native_socket(Server*s) {
1636         union sockaddr_union sa;
1637         int one, r;
1638         struct epoll_event ev;
1639
1640         assert(s);
1641
1642         if (s->native_fd < 0) {
1643
1644                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1645                 if (s->native_fd < 0) {
1646                         log_error("socket() failed: %m");
1647                         return -errno;
1648                 }
1649
1650                 zero(sa);
1651                 sa.un.sun_family = AF_UNIX;
1652                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1653
1654                 unlink(sa.un.sun_path);
1655
1656                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1657                 if (r < 0) {
1658                         log_error("bind() failed: %m");
1659                         return -errno;
1660                 }
1661
1662                 chmod(sa.un.sun_path, 0666);
1663         }
1664
1665         one = 1;
1666         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1667         if (r < 0) {
1668                 log_error("SO_PASSCRED failed: %m");
1669                 return -errno;
1670         }
1671
1672         one = 1;
1673         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1674         if (r < 0) {
1675                 log_error("SO_TIMESTAMP failed: %m");
1676                 return -errno;
1677         }
1678
1679         zero(ev);
1680         ev.events = EPOLLIN;
1681         ev.data.fd = s->native_fd;
1682         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1683                 log_error("Failed to add native server fd to epoll object: %m");
1684                 return -errno;
1685         }
1686
1687         return 0;
1688 }
1689
1690 static int open_stdout_socket(Server *s) {
1691         union sockaddr_union sa;
1692         int r;
1693         struct epoll_event ev;
1694
1695         assert(s);
1696
1697         if (s->stdout_fd < 0) {
1698
1699                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1700                 if (s->stdout_fd < 0) {
1701                         log_error("socket() failed: %m");
1702                         return -errno;
1703                 }
1704
1705                 zero(sa);
1706                 sa.un.sun_family = AF_UNIX;
1707                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1708
1709                 unlink(sa.un.sun_path);
1710
1711                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1712                 if (r < 0) {
1713                         log_error("bind() failed: %m");
1714                         return -errno;
1715                 }
1716
1717                 chmod(sa.un.sun_path, 0666);
1718
1719                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1720                         log_error("liste() failed: %m");
1721                         return -errno;
1722                 }
1723         }
1724
1725         zero(ev);
1726         ev.events = EPOLLIN;
1727         ev.data.fd = s->stdout_fd;
1728         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1729                 log_error("Failed to add stdout server fd to epoll object: %m");
1730                 return -errno;
1731         }
1732
1733         return 0;
1734 }
1735
1736 static int open_signalfd(Server *s) {
1737         sigset_t mask;
1738         struct epoll_event ev;
1739
1740         assert(s);
1741
1742         assert_se(sigemptyset(&mask) == 0);
1743         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1744         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1745
1746         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1747         if (s->signal_fd < 0) {
1748                 log_error("signalfd(): %m");
1749                 return -errno;
1750         }
1751
1752         zero(ev);
1753         ev.events = EPOLLIN;
1754         ev.data.fd = s->signal_fd;
1755
1756         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1757                 log_error("epoll_ctl(): %m");
1758                 return -errno;
1759         }
1760
1761         return 0;
1762 }
1763
1764 static int server_init(Server *s) {
1765         int n, r, fd;
1766
1767         assert(s);
1768
1769         zero(*s);
1770         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1771         s->compress = true;
1772
1773         memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics));
1774         memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics));
1775
1776         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1777         if (!s->user_journals) {
1778                 log_error("Out of memory.");
1779                 return -ENOMEM;
1780         }
1781
1782         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1783         if (s->epoll_fd < 0) {
1784                 log_error("Failed to create epoll object: %m");
1785                 return -errno;
1786         }
1787
1788         n = sd_listen_fds(true);
1789         if (n < 0) {
1790                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1791                 return n;
1792         }
1793
1794         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1795
1796                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1797
1798                         if (s->native_fd >= 0) {
1799                                 log_error("Too many native sockets passed.");
1800                                 return -EINVAL;
1801                         }
1802
1803                         s->native_fd = fd;
1804
1805                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1806
1807                         if (s->stdout_fd >= 0) {
1808                                 log_error("Too many stdout sockets passed.");
1809                                 return -EINVAL;
1810                         }
1811
1812                         s->stdout_fd = fd;
1813
1814                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1815
1816                         if (s->syslog_fd >= 0) {
1817                                 log_error("Too many /dev/log sockets passed.");
1818                                 return -EINVAL;
1819                         }
1820
1821                         s->syslog_fd = fd;
1822
1823                 } else {
1824                         log_error("Unknown socket passed.");
1825                         return -EINVAL;
1826                 }
1827         }
1828
1829         r = open_syslog_socket(s);
1830         if (r < 0)
1831                 return r;
1832
1833         r = open_native_socket(s);
1834         if (r < 0)
1835                 return r;
1836
1837         r = open_stdout_socket(s);
1838         if (r < 0)
1839                 return r;
1840
1841         r = system_journal_open(s);
1842         if (r < 0)
1843                 return r;
1844
1845         r = open_signalfd(s);
1846         if (r < 0)
1847                 return r;
1848
1849         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1850         if (!s->rate_limit)
1851                 return -ENOMEM;
1852
1853         return 0;
1854 }
1855
1856 static void server_done(Server *s) {
1857         JournalFile *f;
1858         assert(s);
1859
1860         while (s->stdout_streams)
1861                 stdout_stream_free(s->stdout_streams);
1862
1863         if (s->system_journal)
1864                 journal_file_close(s->system_journal);
1865
1866         if (s->runtime_journal)
1867                 journal_file_close(s->runtime_journal);
1868
1869         while ((f = hashmap_steal_first(s->user_journals)))
1870                 journal_file_close(f);
1871
1872         hashmap_free(s->user_journals);
1873
1874         if (s->epoll_fd >= 0)
1875                 close_nointr_nofail(s->epoll_fd);
1876
1877         if (s->signal_fd >= 0)
1878                 close_nointr_nofail(s->signal_fd);
1879
1880         if (s->syslog_fd >= 0)
1881                 close_nointr_nofail(s->syslog_fd);
1882
1883         if (s->native_fd >= 0)
1884                 close_nointr_nofail(s->native_fd);
1885
1886         if (s->stdout_fd >= 0)
1887                 close_nointr_nofail(s->stdout_fd);
1888
1889         if (s->rate_limit)
1890                 journal_rate_limit_free(s->rate_limit);
1891
1892         free(s->buffer);
1893 }
1894
1895 int main(int argc, char *argv[]) {
1896         Server server;
1897         int r;
1898
1899         /* if (getppid() != 1) { */
1900         /*         log_error("This program should be invoked by init only."); */
1901         /*         return EXIT_FAILURE; */
1902         /* } */
1903
1904         if (argc > 1) {
1905                 log_error("This program does not take arguments.");
1906                 return EXIT_FAILURE;
1907         }
1908
1909         log_set_target(LOG_TARGET_CONSOLE);
1910         log_parse_environment();
1911         log_open();
1912
1913         umask(0022);
1914
1915         r = server_init(&server);
1916         if (r < 0)
1917                 goto finish;
1918
1919         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1920
1921         sd_notify(false,
1922                   "READY=1\n"
1923                   "STATUS=Processing requests...");
1924
1925         server_vacuum(&server);
1926         server_flush_to_var(&server);
1927
1928         for (;;) {
1929                 struct epoll_event event;
1930
1931                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1932                 if (r < 0) {
1933
1934                         if (errno == EINTR)
1935                                 continue;
1936
1937                         log_error("epoll_wait() failed: %m");
1938                         r = -errno;
1939                         goto finish;
1940                 } else if (r == 0)
1941                         break;
1942
1943                 r = process_event(&server, &event);
1944                 if (r < 0)
1945                         goto finish;
1946                 else if (r == 0)
1947                         break;
1948         }
1949
1950         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1951
1952 finish:
1953         sd_notify(false,
1954                   "STATUS=Shutting down...");
1955
1956         server_done(&server);
1957
1958         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1959 }