chiark / gitweb /
b42be8d7ad2788ef55d409fdb326b1c5b5dd8036
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "sd-login.h"
45 #include "journal-internal.h"
46
47 #define USER_JOURNALS_MAX 1024
48 #define STDOUT_STREAMS_MAX 4096
49
50 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
51 #define DEFAULT_RATE_LIMIT_BURST 200
52
53 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
54
55 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
56
57 #define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
58
59 typedef struct StdoutStream StdoutStream;
60
61 typedef struct Server {
62         int epoll_fd;
63         int signal_fd;
64         int syslog_fd;
65         int native_fd;
66         int stdout_fd;
67
68         JournalFile *runtime_journal;
69         JournalFile *system_journal;
70         Hashmap *user_journals;
71
72         uint64_t seqnum;
73
74         char *buffer;
75         size_t buffer_size;
76
77         JournalRateLimit *rate_limit;
78
79         JournalMetrics runtime_metrics;
80         JournalMetrics system_metrics;
81
82         bool compress;
83
84         uint64_t cached_available_space;
85         usec_t cached_available_space_timestamp;
86
87         uint64_t var_available_timestamp;
88
89         LIST_HEAD(StdoutStream, stdout_streams);
90         unsigned n_stdout_streams;
91 } Server;
92
93 typedef enum StdoutStreamState {
94         STDOUT_STREAM_TAG,
95         STDOUT_STREAM_PRIORITY,
96         STDOUT_STREAM_PRIORITY_PREFIX,
97         STDOUT_STREAM_TEE_CONSOLE,
98         STDOUT_STREAM_RUNNING
99 } StdoutStreamState;
100
101 struct StdoutStream {
102         Server *server;
103         StdoutStreamState state;
104
105         int fd;
106
107         struct ucred ucred;
108
109         char *tag;
110         int priority;
111         bool priority_prefix:1;
112         bool tee_console:1;
113
114         char buffer[LINE_MAX+1];
115         size_t length;
116
117         LIST_FIELDS(StdoutStream, stdout_stream);
118 };
119
120 static int server_flush_to_var(Server *s);
121
122 static uint64_t available_space(Server *s) {
123         char ids[33], *p;
124         const char *f;
125         sd_id128_t machine;
126         struct statvfs ss;
127         uint64_t sum = 0, avail = 0, ss_avail = 0;
128         int r;
129         DIR *d;
130         usec_t ts;
131         JournalMetrics *m;
132
133         ts = now(CLOCK_MONOTONIC);
134
135         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
136                 return s->cached_available_space;
137
138         r = sd_id128_get_machine(&machine);
139         if (r < 0)
140                 return 0;
141
142         if (s->system_journal) {
143                 f = "/var/log/journal/";
144                 m = &s->system_metrics;
145         } else {
146                 f = "/run/log/journal/";
147                 m = &s->runtime_metrics;
148         }
149
150         assert(m);
151
152         p = strappend(f, sd_id128_to_string(machine, ids));
153         if (!p)
154                 return 0;
155
156         d = opendir(p);
157         free(p);
158
159         if (!d)
160                 return 0;
161
162         if (fstatvfs(dirfd(d), &ss) < 0)
163                 goto finish;
164
165         for (;;) {
166                 struct stat st;
167                 struct dirent buf, *de;
168                 int k;
169
170                 k = readdir_r(d, &buf, &de);
171                 if (k != 0) {
172                         r = -k;
173                         goto finish;
174                 }
175
176                 if (!de)
177                         break;
178
179                 if (!dirent_is_file_with_suffix(de, ".journal"))
180                         continue;
181
182                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
183                         continue;
184
185                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
186         }
187
188         avail = sum >= m->max_use ? 0 : m->max_use - sum;
189
190         ss_avail = ss.f_bsize * ss.f_bavail;
191
192         ss_avail = ss_avail < m->keep_free ? 0 : ss_avail - m->keep_free;
193
194         if (ss_avail < avail)
195                 avail = ss_avail;
196
197         s->cached_available_space = avail;
198         s->cached_available_space_timestamp = ts;
199
200 finish:
201         closedir(d);
202
203         return avail;
204 }
205
206 static void fix_perms(JournalFile *f, uid_t uid) {
207         acl_t acl;
208         acl_entry_t entry;
209         acl_permset_t permset;
210         int r;
211
212         assert(f);
213
214         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
215         if (r < 0)
216                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
217
218         if (uid <= 0)
219                 return;
220
221         acl = acl_get_fd(f->fd);
222         if (!acl) {
223                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
224                 return;
225         }
226
227         r = acl_find_uid(acl, uid, &entry);
228         if (r <= 0) {
229
230                 if (acl_create_entry(&acl, &entry) < 0 ||
231                     acl_set_tag_type(entry, ACL_USER) < 0 ||
232                     acl_set_qualifier(entry, &uid) < 0) {
233                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
234                         goto finish;
235                 }
236         }
237
238         if (acl_get_permset(entry, &permset) < 0 ||
239             acl_add_perm(permset, ACL_READ) < 0 ||
240             acl_calc_mask(&acl) < 0) {
241                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
242                 goto finish;
243         }
244
245         if (acl_set_fd(f->fd, acl) < 0)
246                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
247
248 finish:
249         acl_free(acl);
250 }
251
252 static JournalFile* find_journal(Server *s, uid_t uid) {
253         char *p;
254         int r;
255         JournalFile *f;
256         char ids[33];
257         sd_id128_t machine;
258
259         assert(s);
260
261         /* We split up user logs only on /var, not on /run. If the
262          * runtime file is open, we write to it exclusively, in order
263          * to guarantee proper order as soon as we flush /run to
264          * /var and close the runtime file. */
265
266         if (s->runtime_journal)
267                 return s->runtime_journal;
268
269         if (uid <= 0)
270                 return s->system_journal;
271
272         r = sd_id128_get_machine(&machine);
273         if (r < 0)
274                 return s->system_journal;
275
276         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
277         if (f)
278                 return f;
279
280         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
281                 return s->system_journal;
282
283         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
284                 /* Too many open? Then let's close one */
285                 f = hashmap_steal_first(s->user_journals);
286                 assert(f);
287                 journal_file_close(f);
288         }
289
290         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
291         free(p);
292
293         if (r < 0)
294                 return s->system_journal;
295
296         fix_perms(f, uid);
297         f->metrics = s->system_metrics;
298         f->compress = s->compress;
299
300         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
301         if (r < 0) {
302                 journal_file_close(f);
303                 return s->system_journal;
304         }
305
306         return f;
307 }
308
309 static void server_vacuum(Server *s) {
310         Iterator i;
311         void *k;
312         char *p;
313         char ids[33];
314         sd_id128_t machine;
315         int r;
316         JournalFile *f;
317
318         log_info("Rotating...");
319
320         if (s->runtime_journal) {
321                 r = journal_file_rotate(&s->runtime_journal);
322                 if (r < 0)
323                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
324         }
325
326         if (s->system_journal) {
327                 r = journal_file_rotate(&s->system_journal);
328                 if (r < 0)
329                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
330         }
331
332         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
333                 r = journal_file_rotate(&f);
334                 if (r < 0)
335                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
336                 else
337                         hashmap_replace(s->user_journals, k, f);
338         }
339
340         log_info("Vacuuming...");
341
342         r = sd_id128_get_machine(&machine);
343         if (r < 0) {
344                 log_error("Failed to get machine ID: %s", strerror(-r));
345                 return;
346         }
347
348         sd_id128_to_string(machine, ids);
349
350         if (s->system_journal) {
351                 if (asprintf(&p, "/var/log/journal/%s", ids) < 0) {
352                         log_error("Out of memory.");
353                         return;
354                 }
355
356                 r = journal_directory_vacuum(p, s->system_metrics.max_use, s->system_metrics.keep_free);
357                 if (r < 0 && r != -ENOENT)
358                         log_error("Failed to vacuum %s: %s", p, strerror(-r));
359                 free(p);
360         }
361
362
363         if (s->runtime_journal) {
364                 if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
365                         log_error("Out of memory.");
366                         return;
367                 }
368
369                 r = journal_directory_vacuum(p, s->runtime_metrics.max_use, s->runtime_metrics.keep_free);
370                 if (r < 0 && r != -ENOENT)
371                         log_error("Failed to vacuum %s: %s", p, strerror(-r));
372                 free(p);
373         }
374
375         s->cached_available_space_timestamp = 0;
376 }
377
378 static char *shortened_cgroup_path(pid_t pid) {
379         int r;
380         char *process_path, *init_path, *path;
381
382         assert(pid > 0);
383
384         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
385         if (r < 0)
386                 return NULL;
387
388         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
389         if (r < 0) {
390                 free(process_path);
391                 return NULL;
392         }
393
394         if (endswith(init_path, "/system"))
395                 init_path[strlen(init_path) - 7] = 0;
396         else if (streq(init_path, "/"))
397                 init_path[0] = 0;
398
399         if (startswith(process_path, init_path)) {
400                 char *p;
401
402                 p = strdup(process_path + strlen(init_path));
403                 if (!p) {
404                         free(process_path);
405                         free(init_path);
406                         return NULL;
407                 }
408                 path = p;
409         } else {
410                 path = process_path;
411                 process_path = NULL;
412         }
413
414         free(process_path);
415         free(init_path);
416
417         return path;
418 }
419
420 static void dispatch_message_real(Server *s,
421                              struct iovec *iovec, unsigned n, unsigned m,
422                              struct ucred *ucred,
423                              struct timeval *tv) {
424
425         char *pid = NULL, *uid = NULL, *gid = NULL,
426                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
427                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
428                 *audit_session = NULL, *audit_loginuid = NULL,
429                 *exe = NULL, *cgroup = NULL, *session = NULL,
430                 *owner_uid = NULL, *service = NULL;
431
432         char idbuf[33];
433         sd_id128_t id;
434         int r;
435         char *t;
436         uid_t loginuid = 0, realuid = 0;
437         JournalFile *f;
438         bool vacuumed = false;
439
440         assert(s);
441         assert(iovec);
442         assert(n > 0);
443         assert(n + 16 <= m);
444
445         if (ucred) {
446                 uint32_t audit;
447                 uid_t owner;
448
449                 realuid = ucred->uid;
450
451                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
452                         IOVEC_SET_STRING(iovec[n++], pid);
453
454                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
455                         IOVEC_SET_STRING(iovec[n++], uid);
456
457                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
458                         IOVEC_SET_STRING(iovec[n++], gid);
459
460                 r = get_process_comm(ucred->pid, &t);
461                 if (r >= 0) {
462                         comm = strappend("_COMM=", t);
463                         free(t);
464
465                         if (comm)
466                                 IOVEC_SET_STRING(iovec[n++], comm);
467                 }
468
469                 r = get_process_exe(ucred->pid, &t);
470                 if (r >= 0) {
471                         exe = strappend("_EXE=", t);
472                         free(t);
473
474                         if (comm)
475                                 IOVEC_SET_STRING(iovec[n++], exe);
476                 }
477
478                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
479                 if (r >= 0) {
480                         cmdline = strappend("_CMDLINE=", t);
481                         free(t);
482
483                         if (cmdline)
484                                 IOVEC_SET_STRING(iovec[n++], cmdline);
485                 }
486
487                 r = audit_session_from_pid(ucred->pid, &audit);
488                 if (r >= 0)
489                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) audit) >= 0)
490                                 IOVEC_SET_STRING(iovec[n++], audit_session);
491
492                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
493                 if (r >= 0)
494                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
495                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
496
497                 t = shortened_cgroup_path(ucred->pid);
498                 if (t) {
499                         cgroup = strappend("_SYSTEMD_CGROUP=", t);
500                         free(t);
501
502                         if (cgroup)
503                                 IOVEC_SET_STRING(iovec[n++], cgroup);
504                 }
505
506                 if (sd_pid_get_session(ucred->pid, &t) >= 0) {
507                         session = strappend("_SYSTEMD_SESSION=", t);
508                         free(t);
509
510                         if (session)
511                                 IOVEC_SET_STRING(iovec[n++], session);
512                 }
513
514                 if (sd_pid_get_service(ucred->pid, &t) >= 0) {
515                         service = strappend("_SYSTEMD_SERVICE=", t);
516                         free(t);
517
518                         if (service)
519                                 IOVEC_SET_STRING(iovec[n++], service);
520                 }
521
522                 if (sd_pid_get_owner_uid(ucred->uid, &owner) >= 0)
523                         if (asprintf(&owner_uid, "_SYSTEMD_OWNER_UID=%lu", (unsigned long) owner) >= 0)
524                                 IOVEC_SET_STRING(iovec[n++], owner_uid);
525         }
526
527         if (tv) {
528                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
529                              (unsigned long long) timeval_load(tv)) >= 0)
530                         IOVEC_SET_STRING(iovec[n++], source_time);
531         }
532
533         /* Note that strictly speaking storing the boot id here is
534          * redundant since the entry includes this in-line
535          * anyway. However, we need this indexed, too. */
536         r = sd_id128_get_boot(&id);
537         if (r >= 0)
538                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
539                         IOVEC_SET_STRING(iovec[n++], boot_id);
540
541         r = sd_id128_get_machine(&id);
542         if (r >= 0)
543                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
544                         IOVEC_SET_STRING(iovec[n++], machine_id);
545
546         t = gethostname_malloc();
547         if (t) {
548                 hostname = strappend("_HOSTNAME=", t);
549                 free(t);
550                 if (hostname)
551                         IOVEC_SET_STRING(iovec[n++], hostname);
552         }
553
554         assert(n <= m);
555
556         server_flush_to_var(s);
557
558 retry:
559         f = find_journal(s, realuid == 0 ? 0 : loginuid);
560         if (!f)
561                 log_warning("Dropping message, as we can't find a place to store the data.");
562         else {
563                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
564
565                 if (r == -E2BIG && !vacuumed) {
566                         log_info("Allocation limit reached.");
567
568                         server_vacuum(s);
569                         vacuumed = true;
570
571                         log_info("Retrying write.");
572                         goto retry;
573                 }
574
575                 if (r < 0)
576                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
577         }
578
579         free(pid);
580         free(uid);
581         free(gid);
582         free(comm);
583         free(exe);
584         free(cmdline);
585         free(source_time);
586         free(boot_id);
587         free(machine_id);
588         free(hostname);
589         free(audit_session);
590         free(audit_loginuid);
591         free(cgroup);
592         free(session);
593         free(owner_uid);
594         free(service);
595 }
596
597 static void dispatch_message(Server *s,
598                              struct iovec *iovec, unsigned n, unsigned m,
599                              struct ucred *ucred,
600                              struct timeval *tv,
601                              int priority) {
602         int rl;
603         char *path = NULL, *c;
604
605         assert(s);
606         assert(iovec || n == 0);
607
608         if (n == 0)
609                 return;
610
611         if (!ucred)
612                 goto finish;
613
614         path = shortened_cgroup_path(ucred->pid);
615         if (!path)
616                 goto finish;
617
618         /* example: /user/lennart/3/foobar
619          *          /system/dbus.service/foobar
620          *
621          * So let's cut of everything past the third /, since that is
622          * wher user directories start */
623
624         c = strchr(path, '/');
625         if (c) {
626                 c = strchr(c+1, '/');
627                 if (c) {
628                         c = strchr(c+1, '/');
629                         if (c)
630                                 *c = 0;
631                 }
632         }
633
634         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
635
636         if (rl == 0) {
637                 free(path);
638                 return;
639         }
640
641         if (rl > 1) {
642                 int j = 0;
643                 char suppress_message[LINE_MAX];
644                 struct iovec suppress_iovec[18];
645
646                 /* Write a suppression message if we suppressed something */
647
648                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
649                 char_array_0(suppress_message);
650
651                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
652                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
653
654                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
655         }
656
657         free(path);
658
659 finish:
660         dispatch_message_real(s, iovec, n, m, ucred, tv);
661 }
662
663 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
664         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
665         struct iovec iovec[19];
666         unsigned n = 0;
667         int priority = LOG_USER | LOG_INFO;
668
669         assert(s);
670         assert(buf);
671
672         parse_syslog_priority((char**) &buf, &priority);
673         skip_syslog_date((char**) &buf);
674
675         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
676                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
677
678         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
679                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
680
681         message = strappend("MESSAGE=", buf);
682         if (message)
683                 IOVEC_SET_STRING(iovec[n++], message);
684
685         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
686
687         free(message);
688         free(syslog_facility);
689         free(syslog_priority);
690 }
691
692 static bool valid_user_field(const char *p, size_t l) {
693         const char *a;
694
695         /* We kinda enforce POSIX syntax recommendations for
696            environment variables here, but make a couple of additional
697            requirements.
698
699            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
700
701         /* No empty field names */
702         if (l <= 0)
703                 return false;
704
705         /* Don't allow names longer than 64 chars */
706         if (l > 64)
707                 return false;
708
709         /* Variables starting with an underscore are protected */
710         if (p[0] == '_')
711                 return false;
712
713         /* Don't allow digits as first character */
714         if (p[0] >= '0' && p[0] <= '9')
715                 return false;
716
717         /* Only allow A-Z0-9 and '_' */
718         for (a = p; a < p + l; a++)
719                 if (!((*a >= 'A' && *a <= 'Z') ||
720                       (*a >= '0' && *a <= '9') ||
721                       *a == '_'))
722                         return false;
723
724         return true;
725 }
726
727 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
728         struct iovec *iovec = NULL;
729         unsigned n = 0, m = 0, j;
730         const char *p;
731         size_t remaining;
732         int priority = LOG_INFO;
733
734         assert(s);
735         assert(buffer || n == 0);
736
737         p = buffer;
738         remaining = buffer_size;
739
740         while (remaining > 0) {
741                 const char *e, *q;
742
743                 e = memchr(p, '\n', remaining);
744
745                 if (!e) {
746                         /* Trailing noise, let's ignore it, and flush what we collected */
747                         log_debug("Received message with trailing noise, ignoring.");
748                         break;
749                 }
750
751                 if (e == p) {
752                         /* Entry separator */
753                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
754                         n = 0;
755                         priority = LOG_INFO;
756
757                         p++;
758                         remaining--;
759                         continue;
760                 }
761
762                 if (*p == '.' || *p == '#') {
763                         /* Ignore control commands for now, and
764                          * comments too. */
765                         remaining -= (e - p) + 1;
766                         p = e + 1;
767                         continue;
768                 }
769
770                 /* A property follows */
771
772                 if (n+16 >= m) {
773                         struct iovec *c;
774                         unsigned u;
775
776                         u = MAX((n+16U) * 2U, 4U);
777                         c = realloc(iovec, u * sizeof(struct iovec));
778                         if (!c) {
779                                 log_error("Out of memory");
780                                 break;
781                         }
782
783                         iovec = c;
784                         m = u;
785                 }
786
787                 q = memchr(p, '=', e - p);
788                 if (q) {
789                         if (valid_user_field(p, q - p)) {
790                                 /* If the field name starts with an
791                                  * underscore, skip the variable,
792                                  * since that indidates a trusted
793                                  * field */
794                                 iovec[n].iov_base = (char*) p;
795                                 iovec[n].iov_len = e - p;
796                                 n++;
797
798                                 /* We need to determine the priority
799                                  * of this entry for the rate limiting
800                                  * logic */
801                                 if (e - p == 10 &&
802                                     memcmp(p, "PRIORITY=", 10) == 0 &&
803                                     p[10] >= '0' &&
804                                     p[10] <= '9')
805                                         priority = p[10] - '0';
806                         }
807
808                         remaining -= (e - p) + 1;
809                         p = e + 1;
810                         continue;
811                 } else {
812                         uint64_t l;
813                         char *k;
814
815                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
816                                 log_debug("Failed to parse message, ignoring.");
817                                 break;
818                         }
819
820                         memcpy(&l, e + 1, sizeof(uint64_t));
821                         l = le64toh(l);
822
823                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
824                             e[1+sizeof(uint64_t)+l] != '\n') {
825                                 log_debug("Failed to parse message, ignoring.");
826                                 break;
827                         }
828
829                         k = malloc((e - p) + 1 + l);
830                         if (!k) {
831                                 log_error("Out of memory");
832                                 break;
833                         }
834
835                         memcpy(k, p, e - p);
836                         k[e - p] = '=';
837                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
838
839                         if (valid_user_field(p, e - p)) {
840                                 iovec[n].iov_base = k;
841                                 iovec[n].iov_len = (e - p) + 1 + l;
842                                 n++;
843                         } else
844                                 free(k);
845
846                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
847                         p = e + 1 + sizeof(uint64_t) + l + 1;
848                 }
849         }
850
851         dispatch_message(s, iovec, n, m, ucred, tv, priority);
852
853         for (j = 0; j < n; j++)
854                 if (iovec[j].iov_base < buffer ||
855                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
856                         free(iovec[j].iov_base);
857 }
858
859 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
860         struct iovec iovec[18];
861         char *message = NULL, *syslog_priority = NULL;
862         unsigned n = 0;
863         size_t tag_len;
864         int priority;
865
866         assert(s);
867         assert(p);
868
869         priority = s->priority;
870
871         if (s->priority_prefix &&
872             l > 3 &&
873             p[0] == '<' &&
874             p[1] >= '0' && p[1] <= '7' &&
875             p[2] == '>') {
876
877                 priority = p[1] - '0';
878                 p += 3;
879                 l -= 3;
880         }
881
882         if (l <= 0)
883                 return 0;
884
885         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
886                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
887
888         tag_len = s->tag ? strlen(s->tag) + 2: 0;
889         message = malloc(8 + tag_len + l);
890         if (message) {
891                 memcpy(message, "MESSAGE=", 8);
892
893                 if (s->tag) {
894                         memcpy(message+8, s->tag, tag_len-2);
895                         memcpy(message+8+tag_len-2, ": ", 2);
896                 }
897
898                 memcpy(message+8+tag_len, p, l);
899                 iovec[n].iov_base = message;
900                 iovec[n].iov_len = 8+tag_len+l;
901                 n++;
902         }
903
904         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
905
906         if (s->tee_console) {
907                 int console;
908
909                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
910                 if (console >= 0) {
911                         n = 0;
912                         if (s->tag) {
913                                 IOVEC_SET_STRING(iovec[n++], s->tag);
914                                 IOVEC_SET_STRING(iovec[n++], ": ");
915                         }
916
917                         iovec[n].iov_base = (void*) p;
918                         iovec[n].iov_len = l;
919                         n++;
920
921                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
922
923                         writev(console, iovec, n);
924                 }
925         }
926
927         free(message);
928         free(syslog_priority);
929
930         return 0;
931 }
932
933 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
934         assert(s);
935         assert(p);
936
937         while (l > 0 && strchr(WHITESPACE, *p)) {
938                 l--;
939                 p++;
940         }
941
942         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
943                 l--;
944
945         switch (s->state) {
946
947         case STDOUT_STREAM_TAG:
948
949                 if (l > 0) {
950                         s->tag = strndup(p, l);
951                         if (!s->tag) {
952                                 log_error("Out of memory");
953                                 return -EINVAL;
954                         }
955                 }
956
957                 s->state = STDOUT_STREAM_PRIORITY;
958                 return 0;
959
960         case STDOUT_STREAM_PRIORITY:
961                 if (l != 1 || *p < '0' || *p > '7') {
962                         log_warning("Failed to parse log priority line.");
963                         return -EINVAL;
964                 }
965
966                 s->priority = *p - '0';
967                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
968                 return 0;
969
970         case STDOUT_STREAM_PRIORITY_PREFIX:
971                 if (l != 1 || *p < '0' || *p > '1') {
972                         log_warning("Failed to parse priority prefix line.");
973                         return -EINVAL;
974                 }
975
976                 s->priority_prefix = *p - '0';
977                 s->state = STDOUT_STREAM_TEE_CONSOLE;
978                 return 0;
979
980         case STDOUT_STREAM_TEE_CONSOLE:
981                 if (l != 1 || *p < '0' || *p > '1') {
982                         log_warning("Failed to parse tee to console line.");
983                         return -EINVAL;
984                 }
985
986                 s->tee_console = *p - '0';
987                 s->state = STDOUT_STREAM_RUNNING;
988                 return 0;
989
990         case STDOUT_STREAM_RUNNING:
991                 return stdout_stream_log(s, p, l);
992         }
993
994         assert_not_reached("Unknown stream state");
995 }
996
997 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
998         char *p;
999         size_t remaining;
1000         int r;
1001
1002         assert(s);
1003
1004         p = s->buffer;
1005         remaining = s->length;
1006         for (;;) {
1007                 char *end;
1008                 size_t skip;
1009
1010                 end = memchr(p, '\n', remaining);
1011                 if (!end) {
1012                         if (remaining >= LINE_MAX) {
1013                                 end = p + LINE_MAX;
1014                                 skip = LINE_MAX;
1015                         } else
1016                                 break;
1017                 } else
1018                         skip = end - p + 1;
1019
1020                 r = stdout_stream_line(s, p, end - p);
1021                 if (r < 0)
1022                         return r;
1023
1024                 remaining -= skip;
1025                 p += skip;
1026         }
1027
1028         if (force_flush && remaining > 0) {
1029                 r = stdout_stream_line(s, p, remaining);
1030                 if (r < 0)
1031                         return r;
1032
1033                 p += remaining;
1034                 remaining = 0;
1035         }
1036
1037         if (p > s->buffer) {
1038                 memmove(s->buffer, p, remaining);
1039                 s->length = remaining;
1040         }
1041
1042         return 0;
1043 }
1044
1045 static int stdout_stream_process(StdoutStream *s) {
1046         ssize_t l;
1047         int r;
1048
1049         assert(s);
1050
1051         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
1052         if (l < 0) {
1053
1054                 if (errno == EAGAIN)
1055                         return 0;
1056
1057                 log_warning("Failed to read from stream: %m");
1058                 return -errno;
1059         }
1060
1061         if (l == 0) {
1062                 r = stdout_stream_scan(s, true);
1063                 if (r < 0)
1064                         return r;
1065
1066                 return 0;
1067         }
1068
1069         s->length += l;
1070         r = stdout_stream_scan(s, false);
1071         if (r < 0)
1072                 return r;
1073
1074         return 1;
1075
1076 }
1077
1078 static void stdout_stream_free(StdoutStream *s) {
1079         assert(s);
1080
1081         if (s->server) {
1082                 assert(s->server->n_stdout_streams > 0);
1083                 s->server->n_stdout_streams --;
1084                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1085         }
1086
1087         if (s->fd >= 0) {
1088                 if (s->server)
1089                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1090
1091                 close_nointr_nofail(s->fd);
1092         }
1093
1094         free(s->tag);
1095         free(s);
1096 }
1097
1098 static int stdout_stream_new(Server *s) {
1099         StdoutStream *stream;
1100         int fd, r;
1101         socklen_t len;
1102         struct epoll_event ev;
1103
1104         assert(s);
1105
1106         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1107         if (fd < 0) {
1108                 if (errno == EAGAIN)
1109                         return 0;
1110
1111                 log_error("Failed to accept stdout connection: %m");
1112                 return -errno;
1113         }
1114
1115         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1116                 log_warning("Too many stdout streams, refusing connection.");
1117                 close_nointr_nofail(fd);
1118                 return 0;
1119         }
1120
1121         stream = new0(StdoutStream, 1);
1122         if (!stream) {
1123                 log_error("Out of memory.");
1124                 close_nointr_nofail(fd);
1125                 return -ENOMEM;
1126         }
1127
1128         stream->fd = fd;
1129
1130         len = sizeof(stream->ucred);
1131         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1132                 log_error("Failed to determine peer credentials: %m");
1133                 r = -errno;
1134                 goto fail;
1135         }
1136
1137         if (shutdown(fd, SHUT_WR) < 0) {
1138                 log_error("Failed to shutdown writing side of socket: %m");
1139                 r = -errno;
1140                 goto fail;
1141         }
1142
1143         zero(ev);
1144         ev.data.ptr = stream;
1145         ev.events = EPOLLIN;
1146         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1147                 log_error("Failed to add stream to event loop: %m");
1148                 r = -errno;
1149                 goto fail;
1150         }
1151
1152         stream->server = s;
1153         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1154         s->n_stdout_streams ++;
1155
1156         return 0;
1157
1158 fail:
1159         stdout_stream_free(stream);
1160         return r;
1161 }
1162
1163 static int system_journal_open(Server *s) {
1164         int r;
1165         char *fn;
1166         sd_id128_t machine;
1167         char ids[33];
1168
1169         r = sd_id128_get_machine(&machine);
1170         if (r < 0)
1171                 return r;
1172
1173         sd_id128_to_string(machine, ids);
1174
1175         if (!s->system_journal) {
1176
1177                 /* First try to create the machine path, but not the prefix */
1178                 fn = strappend("/var/log/journal/", ids);
1179                 if (!fn)
1180                         return -ENOMEM;
1181                 (void) mkdir(fn, 0755);
1182                 free(fn);
1183
1184                 /* The create the system journal file */
1185                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1186                 if (!fn)
1187                         return -ENOMEM;
1188
1189                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1190                 free(fn);
1191
1192                 if (r >= 0) {
1193                         journal_default_metrics(&s->system_metrics, s->system_journal->fd);
1194
1195                         s->system_journal->metrics = s->system_metrics;
1196                         s->system_journal->compress = s->compress;
1197
1198                         fix_perms(s->system_journal, 0);
1199                 } else if (r < 0) {
1200
1201                         if (r != -ENOENT && r != -EROFS)
1202                                 log_warning("Failed to open system journal: %s", strerror(-r));
1203
1204                         r = 0;
1205                 }
1206         }
1207
1208         if (!s->runtime_journal) {
1209
1210                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1211                 if (!fn)
1212                         return -ENOMEM;
1213
1214                 if (s->system_journal) {
1215
1216                         /* Try to open the runtime journal, but only
1217                          * if it already exists, so that we can flush
1218                          * it into the system journal */
1219
1220                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1221                         free(fn);
1222
1223                         if (r < 0) {
1224                                 if (r != -ENOENT)
1225                                         log_warning("Failed to open runtime journal: %s", strerror(-r));
1226
1227                                 r = 0;
1228                         }
1229
1230                 } else {
1231
1232                         /* OK, we really need the runtime journal, so create
1233                          * it if necessary. */
1234
1235                         (void) mkdir_parents(fn, 0755);
1236                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1237                         free(fn);
1238
1239                         if (r < 0) {
1240                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1241                                 return r;
1242                         }
1243                 }
1244
1245                 if (s->runtime_journal) {
1246                         journal_default_metrics(&s->runtime_metrics, s->runtime_journal->fd);
1247
1248                         s->runtime_journal->metrics = s->runtime_metrics;
1249                         s->runtime_journal->compress = s->compress;
1250
1251                         fix_perms(s->runtime_journal, 0);
1252                 }
1253         }
1254
1255         return r;
1256 }
1257
1258 static int server_flush_to_var(Server *s) {
1259         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1260         Object *o = NULL;
1261         int r;
1262         sd_id128_t machine;
1263         sd_journal *j;
1264         usec_t ts;
1265
1266         assert(s);
1267
1268         if (!s->runtime_journal)
1269                 return 0;
1270
1271         ts = now(CLOCK_MONOTONIC);
1272         if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1273                 return 0;
1274
1275         s->var_available_timestamp = ts;
1276
1277         system_journal_open(s);
1278
1279         if (!s->system_journal)
1280                 return 0;
1281
1282         r = sd_id128_get_machine(&machine);
1283         if (r < 0) {
1284                 log_error("Failed to get machine id: %s", strerror(-r));
1285                 return r;
1286         }
1287
1288         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1289         if (r < 0) {
1290                 log_error("Failed to read runtime journal: %s", strerror(-r));
1291                 return r;
1292         }
1293
1294         SD_JOURNAL_FOREACH(j) {
1295                 JournalFile *f;
1296
1297                 f = j->current_file;
1298                 assert(f && f->current_offset > 0);
1299
1300                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1301                 if (r < 0) {
1302                         log_error("Can't read entry: %s", strerror(-r));
1303                         goto finish;
1304                 }
1305
1306                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1307                 if (r == -E2BIG) {
1308                         log_info("Allocation limit reached.");
1309
1310                         journal_file_post_change(s->system_journal);
1311                         server_vacuum(s);
1312
1313                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1314                 }
1315
1316                 if (r < 0) {
1317                         log_error("Can't write entry: %s", strerror(-r));
1318                         goto finish;
1319                 }
1320         }
1321
1322 finish:
1323         journal_file_post_change(s->system_journal);
1324
1325         journal_file_close(s->runtime_journal);
1326         s->runtime_journal = NULL;
1327
1328         if (r >= 0) {
1329                 sd_id128_to_string(machine, path + 17);
1330                 rm_rf(path, false, true, false);
1331         }
1332
1333         return r;
1334 }
1335
1336 static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1337         struct msghdr msghdr;
1338         struct iovec iovec;
1339         struct cmsghdr *cmsg;
1340         union {
1341                 struct cmsghdr cmsghdr;
1342                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1343                             CMSG_SPACE(sizeof(struct timeval))];
1344         } control;
1345         union sockaddr_union sa;
1346
1347         assert(s);
1348
1349         zero(msghdr);
1350
1351         zero(iovec);
1352         iovec.iov_base = (void*) buffer;
1353         iovec.iov_len = length;
1354         msghdr.msg_iov = &iovec;
1355         msghdr.msg_iovlen = 1;
1356
1357         zero(sa);
1358         sa.un.sun_family = AF_UNIX;
1359         strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1360         msghdr.msg_name = &sa;
1361         msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1362
1363         zero(control);
1364         msghdr.msg_control = &control;
1365         msghdr.msg_controllen = sizeof(control);
1366
1367         cmsg = CMSG_FIRSTHDR(&msghdr);
1368         cmsg->cmsg_level = SOL_SOCKET;
1369         cmsg->cmsg_type = SCM_CREDENTIALS;
1370         cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1371         memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1372         msghdr.msg_controllen = cmsg->cmsg_len;
1373
1374         /* Forward the syslog message we received via /dev/log to
1375          * /run/systemd/syslog. Unfortunately we currently can't set
1376          * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1377
1378         if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1379                 return;
1380
1381         if (errno == ESRCH) {
1382                 struct ucred u;
1383
1384                 /* Hmm, presumably the sender process vanished
1385                  * by now, so let's fix it as good as we
1386                  * can, and retry */
1387
1388                 u = *ucred;
1389                 u.pid = getpid();
1390                 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1391
1392                 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1393                         return;
1394         }
1395
1396         log_debug("Failed to forward syslog message: %m");
1397 }
1398
1399 static int process_event(Server *s, struct epoll_event *ev) {
1400         assert(s);
1401
1402         if (ev->data.fd == s->signal_fd) {
1403                 struct signalfd_siginfo sfsi;
1404                 ssize_t n;
1405
1406                 if (ev->events != EPOLLIN) {
1407                         log_info("Got invalid event from epoll.");
1408                         return -EIO;
1409                 }
1410
1411                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1412                 if (n != sizeof(sfsi)) {
1413
1414                         if (n >= 0)
1415                                 return -EIO;
1416
1417                         if (errno == EINTR || errno == EAGAIN)
1418                                 return 0;
1419
1420                         return -errno;
1421                 }
1422
1423                 if (sfsi.ssi_signo == SIGUSR1) {
1424                         server_flush_to_var(s);
1425                         return 0;
1426                 }
1427
1428                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1429                 return 0;
1430
1431         } else if (ev->data.fd == s->native_fd ||
1432                    ev->data.fd == s->syslog_fd) {
1433
1434                 if (ev->events != EPOLLIN) {
1435                         log_info("Got invalid event from epoll.");
1436                         return -EIO;
1437                 }
1438
1439                 for (;;) {
1440                         struct msghdr msghdr;
1441                         struct iovec iovec;
1442                         struct ucred *ucred = NULL;
1443                         struct timeval *tv = NULL;
1444                         struct cmsghdr *cmsg;
1445                         union {
1446                                 struct cmsghdr cmsghdr;
1447                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1448                                             CMSG_SPACE(sizeof(struct timeval))];
1449                         } control;
1450                         ssize_t n;
1451                         int v;
1452
1453                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1454                                 log_error("SIOCINQ failed: %m");
1455                                 return -errno;
1456                         }
1457
1458                         if (v <= 0)
1459                                 return 1;
1460
1461                         if (s->buffer_size < (size_t) v) {
1462                                 void *b;
1463                                 size_t l;
1464
1465                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1466                                 b = realloc(s->buffer, l+1);
1467
1468                                 if (!b) {
1469                                         log_error("Couldn't increase buffer.");
1470                                         return -ENOMEM;
1471                                 }
1472
1473                                 s->buffer_size = l;
1474                                 s->buffer = b;
1475                         }
1476
1477                         zero(iovec);
1478                         iovec.iov_base = s->buffer;
1479                         iovec.iov_len = s->buffer_size;
1480
1481                         zero(control);
1482                         zero(msghdr);
1483                         msghdr.msg_iov = &iovec;
1484                         msghdr.msg_iovlen = 1;
1485                         msghdr.msg_control = &control;
1486                         msghdr.msg_controllen = sizeof(control);
1487
1488                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1489                         if (n < 0) {
1490
1491                                 if (errno == EINTR || errno == EAGAIN)
1492                                         return 1;
1493
1494                                 log_error("recvmsg() failed: %m");
1495                                 return -errno;
1496                         }
1497
1498                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1499
1500                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1501                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1502                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1503                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1504                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1505                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1506                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1507                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1508                         }
1509
1510                         if (ev->data.fd == s->syslog_fd) {
1511                                 char *e;
1512
1513                                 e = memchr(s->buffer, '\n', n);
1514                                 if (e)
1515                                         *e = 0;
1516                                 else
1517                                         s->buffer[n] = 0;
1518
1519                                 forward_syslog(s, s->buffer, n, ucred, tv);
1520                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1521                         } else
1522                                 process_native_message(s, s->buffer, n, ucred, tv);
1523                 }
1524
1525                 return 1;
1526
1527         } else if (ev->data.fd == s->stdout_fd) {
1528
1529                 if (ev->events != EPOLLIN) {
1530                         log_info("Got invalid event from epoll.");
1531                         return -EIO;
1532                 }
1533
1534                 stdout_stream_new(s);
1535                 return 1;
1536
1537         } else {
1538                 StdoutStream *stream;
1539
1540                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1541                         log_info("Got invalid event from epoll.");
1542                         return -EIO;
1543                 }
1544
1545                 /* If it is none of the well-known fds, it must be an
1546                  * stdout stream fd. Note that this is a bit ugly here
1547                  * (since we rely that none of the well-known fds
1548                  * could be interpreted as pointer), but nonetheless
1549                  * safe, since the well-known fds would never get an
1550                  * fd > 4096, i.e. beyond the first memory page */
1551
1552                 stream = ev->data.ptr;
1553
1554                 if (stdout_stream_process(stream) <= 0)
1555                         stdout_stream_free(stream);
1556
1557                 return 1;
1558         }
1559
1560         log_error("Unknown event.");
1561         return 0;
1562 }
1563
1564 static int open_syslog_socket(Server *s) {
1565         union sockaddr_union sa;
1566         int one, r;
1567         struct epoll_event ev;
1568         struct timeval tv;
1569
1570         assert(s);
1571
1572         if (s->syslog_fd < 0) {
1573
1574                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1575                 if (s->syslog_fd < 0) {
1576                         log_error("socket() failed: %m");
1577                         return -errno;
1578                 }
1579
1580                 zero(sa);
1581                 sa.un.sun_family = AF_UNIX;
1582                 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
1583
1584                 unlink(sa.un.sun_path);
1585
1586                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1587                 if (r < 0) {
1588                         log_error("bind() failed: %m");
1589                         return -errno;
1590                 }
1591
1592                 chmod(sa.un.sun_path, 0666);
1593         }
1594
1595         one = 1;
1596         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1597         if (r < 0) {
1598                 log_error("SO_PASSCRED failed: %m");
1599                 return -errno;
1600         }
1601
1602         one = 1;
1603         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1604         if (r < 0) {
1605                 log_error("SO_TIMESTAMP failed: %m");
1606                 return -errno;
1607         }
1608
1609         /* Since we use the same socket for forwarding this to some
1610          * other syslog implementation, make sure we don't hang
1611          * forever */
1612         timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1613         if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1614                 log_error("SO_SNDTIMEO failed: %m");
1615                 return -errno;
1616         }
1617
1618         zero(ev);
1619         ev.events = EPOLLIN;
1620         ev.data.fd = s->syslog_fd;
1621         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1622                 log_error("Failed to add syslog server fd to epoll object: %m");
1623                 return -errno;
1624         }
1625
1626         return 0;
1627 }
1628
1629 static int open_native_socket(Server*s) {
1630         union sockaddr_union sa;
1631         int one, r;
1632         struct epoll_event ev;
1633
1634         assert(s);
1635
1636         if (s->native_fd < 0) {
1637
1638                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1639                 if (s->native_fd < 0) {
1640                         log_error("socket() failed: %m");
1641                         return -errno;
1642                 }
1643
1644                 zero(sa);
1645                 sa.un.sun_family = AF_UNIX;
1646                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1647
1648                 unlink(sa.un.sun_path);
1649
1650                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1651                 if (r < 0) {
1652                         log_error("bind() failed: %m");
1653                         return -errno;
1654                 }
1655
1656                 chmod(sa.un.sun_path, 0666);
1657         }
1658
1659         one = 1;
1660         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1661         if (r < 0) {
1662                 log_error("SO_PASSCRED failed: %m");
1663                 return -errno;
1664         }
1665
1666         one = 1;
1667         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1668         if (r < 0) {
1669                 log_error("SO_TIMESTAMP failed: %m");
1670                 return -errno;
1671         }
1672
1673         zero(ev);
1674         ev.events = EPOLLIN;
1675         ev.data.fd = s->native_fd;
1676         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1677                 log_error("Failed to add native server fd to epoll object: %m");
1678                 return -errno;
1679         }
1680
1681         return 0;
1682 }
1683
1684 static int open_stdout_socket(Server *s) {
1685         union sockaddr_union sa;
1686         int r;
1687         struct epoll_event ev;
1688
1689         assert(s);
1690
1691         if (s->stdout_fd < 0) {
1692
1693                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1694                 if (s->stdout_fd < 0) {
1695                         log_error("socket() failed: %m");
1696                         return -errno;
1697                 }
1698
1699                 zero(sa);
1700                 sa.un.sun_family = AF_UNIX;
1701                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1702
1703                 unlink(sa.un.sun_path);
1704
1705                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1706                 if (r < 0) {
1707                         log_error("bind() failed: %m");
1708                         return -errno;
1709                 }
1710
1711                 chmod(sa.un.sun_path, 0666);
1712
1713                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1714                         log_error("liste() failed: %m");
1715                         return -errno;
1716                 }
1717         }
1718
1719         zero(ev);
1720         ev.events = EPOLLIN;
1721         ev.data.fd = s->stdout_fd;
1722         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1723                 log_error("Failed to add stdout server fd to epoll object: %m");
1724                 return -errno;
1725         }
1726
1727         return 0;
1728 }
1729
1730 static int open_signalfd(Server *s) {
1731         sigset_t mask;
1732         struct epoll_event ev;
1733
1734         assert(s);
1735
1736         assert_se(sigemptyset(&mask) == 0);
1737         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1738         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1739
1740         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1741         if (s->signal_fd < 0) {
1742                 log_error("signalfd(): %m");
1743                 return -errno;
1744         }
1745
1746         zero(ev);
1747         ev.events = EPOLLIN;
1748         ev.data.fd = s->signal_fd;
1749
1750         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1751                 log_error("epoll_ctl(): %m");
1752                 return -errno;
1753         }
1754
1755         return 0;
1756 }
1757
1758 static int server_init(Server *s) {
1759         int n, r, fd;
1760
1761         assert(s);
1762
1763         zero(*s);
1764         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1765         s->compress = true;
1766
1767         memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics));
1768         memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics));
1769
1770         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1771         if (!s->user_journals) {
1772                 log_error("Out of memory.");
1773                 return -ENOMEM;
1774         }
1775
1776         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1777         if (s->epoll_fd < 0) {
1778                 log_error("Failed to create epoll object: %m");
1779                 return -errno;
1780         }
1781
1782         n = sd_listen_fds(true);
1783         if (n < 0) {
1784                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1785                 return n;
1786         }
1787
1788         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1789
1790                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1791
1792                         if (s->native_fd >= 0) {
1793                                 log_error("Too many native sockets passed.");
1794                                 return -EINVAL;
1795                         }
1796
1797                         s->native_fd = fd;
1798
1799                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1800
1801                         if (s->stdout_fd >= 0) {
1802                                 log_error("Too many stdout sockets passed.");
1803                                 return -EINVAL;
1804                         }
1805
1806                         s->stdout_fd = fd;
1807
1808                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1809
1810                         if (s->syslog_fd >= 0) {
1811                                 log_error("Too many /dev/log sockets passed.");
1812                                 return -EINVAL;
1813                         }
1814
1815                         s->syslog_fd = fd;
1816
1817                 } else {
1818                         log_error("Unknown socket passed.");
1819                         return -EINVAL;
1820                 }
1821         }
1822
1823         r = open_syslog_socket(s);
1824         if (r < 0)
1825                 return r;
1826
1827         r = open_native_socket(s);
1828         if (r < 0)
1829                 return r;
1830
1831         r = open_stdout_socket(s);
1832         if (r < 0)
1833                 return r;
1834
1835         r = system_journal_open(s);
1836         if (r < 0)
1837                 return r;
1838
1839         r = open_signalfd(s);
1840         if (r < 0)
1841                 return r;
1842
1843         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1844         if (!s->rate_limit)
1845                 return -ENOMEM;
1846
1847         return 0;
1848 }
1849
1850 static void server_done(Server *s) {
1851         JournalFile *f;
1852         assert(s);
1853
1854         while (s->stdout_streams)
1855                 stdout_stream_free(s->stdout_streams);
1856
1857         if (s->system_journal)
1858                 journal_file_close(s->system_journal);
1859
1860         if (s->runtime_journal)
1861                 journal_file_close(s->runtime_journal);
1862
1863         while ((f = hashmap_steal_first(s->user_journals)))
1864                 journal_file_close(f);
1865
1866         hashmap_free(s->user_journals);
1867
1868         if (s->epoll_fd >= 0)
1869                 close_nointr_nofail(s->epoll_fd);
1870
1871         if (s->signal_fd >= 0)
1872                 close_nointr_nofail(s->signal_fd);
1873
1874         if (s->syslog_fd >= 0)
1875                 close_nointr_nofail(s->syslog_fd);
1876
1877         if (s->native_fd >= 0)
1878                 close_nointr_nofail(s->native_fd);
1879
1880         if (s->stdout_fd >= 0)
1881                 close_nointr_nofail(s->stdout_fd);
1882
1883         if (s->rate_limit)
1884                 journal_rate_limit_free(s->rate_limit);
1885
1886         free(s->buffer);
1887 }
1888
1889 int main(int argc, char *argv[]) {
1890         Server server;
1891         int r;
1892
1893         /* if (getppid() != 1) { */
1894         /*         log_error("This program should be invoked by init only."); */
1895         /*         return EXIT_FAILURE; */
1896         /* } */
1897
1898         if (argc > 1) {
1899                 log_error("This program does not take arguments.");
1900                 return EXIT_FAILURE;
1901         }
1902
1903         log_set_target(LOG_TARGET_CONSOLE);
1904         log_parse_environment();
1905         log_open();
1906
1907         umask(0022);
1908
1909         r = server_init(&server);
1910         if (r < 0)
1911                 goto finish;
1912
1913         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1914
1915         sd_notify(false,
1916                   "READY=1\n"
1917                   "STATUS=Processing requests...");
1918
1919         server_vacuum(&server);
1920         server_flush_to_var(&server);
1921
1922         for (;;) {
1923                 struct epoll_event event;
1924
1925                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1926                 if (r < 0) {
1927
1928                         if (errno == EINTR)
1929                                 continue;
1930
1931                         log_error("epoll_wait() failed: %m");
1932                         r = -errno;
1933                         goto finish;
1934                 } else if (r == 0)
1935                         break;
1936
1937                 r = process_event(&server, &event);
1938                 if (r < 0)
1939                         goto finish;
1940                 else if (r == 0)
1941                         break;
1942         }
1943
1944         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1945
1946 finish:
1947         sd_notify(false,
1948                   "STATUS=Shutting down...");
1949
1950         server_done(&server);
1951
1952         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1953 }