chiark / gitweb /
1faf570161a21457805e5836bb1590f07f0cb850
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "sd-login.h"
45 #include "journal-internal.h"
46
47 #define USER_JOURNALS_MAX 1024
48 #define STDOUT_STREAMS_MAX 4096
49
50 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
51 #define DEFAULT_RATE_LIMIT_BURST 200
52
53 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
54
55 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
56
57 #define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
58
59 typedef struct StdoutStream StdoutStream;
60
61 typedef struct Server {
62         int epoll_fd;
63         int signal_fd;
64         int syslog_fd;
65         int native_fd;
66         int stdout_fd;
67
68         JournalFile *runtime_journal;
69         JournalFile *system_journal;
70         Hashmap *user_journals;
71
72         uint64_t seqnum;
73
74         char *buffer;
75         size_t buffer_size;
76
77         JournalRateLimit *rate_limit;
78
79         JournalMetrics runtime_metrics;
80         JournalMetrics system_metrics;
81
82         bool compress;
83
84         uint64_t cached_available_space;
85         usec_t cached_available_space_timestamp;
86
87         uint64_t var_available_timestamp;
88
89         LIST_HEAD(StdoutStream, stdout_streams);
90         unsigned n_stdout_streams;
91 } Server;
92
93 typedef enum StdoutStreamState {
94         STDOUT_STREAM_TAG,
95         STDOUT_STREAM_PRIORITY,
96         STDOUT_STREAM_PRIORITY_PREFIX,
97         STDOUT_STREAM_TEE_CONSOLE,
98         STDOUT_STREAM_RUNNING
99 } StdoutStreamState;
100
101 struct StdoutStream {
102         Server *server;
103         StdoutStreamState state;
104
105         int fd;
106
107         struct ucred ucred;
108
109         char *tag;
110         int priority;
111         bool priority_prefix:1;
112         bool tee_console:1;
113
114         char buffer[LINE_MAX+1];
115         size_t length;
116
117         LIST_FIELDS(StdoutStream, stdout_stream);
118 };
119
120 static int server_flush_to_var(Server *s);
121
122 static uint64_t available_space(Server *s) {
123         char ids[33], *p;
124         const char *f;
125         sd_id128_t machine;
126         struct statvfs ss;
127         uint64_t sum = 0, avail = 0, ss_avail = 0;
128         int r;
129         DIR *d;
130         usec_t ts;
131         JournalMetrics *m;
132
133         ts = now(CLOCK_MONOTONIC);
134
135         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
136                 return s->cached_available_space;
137
138         r = sd_id128_get_machine(&machine);
139         if (r < 0)
140                 return 0;
141
142         if (s->system_journal) {
143                 f = "/var/log/journal/";
144                 m = &s->system_metrics;
145         } else {
146                 f = "/run/log/journal/";
147                 m = &s->runtime_metrics;
148         }
149
150         assert(m);
151
152         p = strappend(f, sd_id128_to_string(machine, ids));
153         if (!p)
154                 return 0;
155
156         d = opendir(p);
157         free(p);
158
159         if (!d)
160                 return 0;
161
162         if (fstatvfs(dirfd(d), &ss) < 0)
163                 goto finish;
164
165         for (;;) {
166                 struct stat st;
167                 struct dirent buf, *de;
168                 int k;
169
170                 k = readdir_r(d, &buf, &de);
171                 if (k != 0) {
172                         r = -k;
173                         goto finish;
174                 }
175
176                 if (!de)
177                         break;
178
179                 if (!dirent_is_file_with_suffix(de, ".journal"))
180                         continue;
181
182                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
183                         continue;
184
185                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
186         }
187
188         avail = sum >= m->max_use ? 0 : m->max_use - sum;
189
190         ss_avail = ss.f_bsize * ss.f_bavail;
191
192         ss_avail = ss_avail < m->keep_free ? 0 : ss_avail - m->keep_free;
193
194         if (ss_avail < avail)
195                 avail = ss_avail;
196
197         s->cached_available_space = avail;
198         s->cached_available_space_timestamp = ts;
199
200 finish:
201         closedir(d);
202
203         return avail;
204 }
205
206 static void fix_perms(JournalFile *f, uid_t uid) {
207         acl_t acl;
208         acl_entry_t entry;
209         acl_permset_t permset;
210         int r;
211
212         assert(f);
213
214         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
215         if (r < 0)
216                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
217
218         if (uid <= 0)
219                 return;
220
221         acl = acl_get_fd(f->fd);
222         if (!acl) {
223                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
224                 return;
225         }
226
227         r = acl_find_uid(acl, uid, &entry);
228         if (r <= 0) {
229
230                 if (acl_create_entry(&acl, &entry) < 0 ||
231                     acl_set_tag_type(entry, ACL_USER) < 0 ||
232                     acl_set_qualifier(entry, &uid) < 0) {
233                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
234                         goto finish;
235                 }
236         }
237
238         if (acl_get_permset(entry, &permset) < 0 ||
239             acl_add_perm(permset, ACL_READ) < 0 ||
240             acl_calc_mask(&acl) < 0) {
241                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
242                 goto finish;
243         }
244
245         if (acl_set_fd(f->fd, acl) < 0)
246                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
247
248 finish:
249         acl_free(acl);
250 }
251
252 static JournalFile* find_journal(Server *s, uid_t uid) {
253         char *p;
254         int r;
255         JournalFile *f;
256         char ids[33];
257         sd_id128_t machine;
258
259         assert(s);
260
261         /* We split up user logs only on /var, not on /run. If the
262          * runtime file is open, we write to it exclusively, in order
263          * to guarantee proper order as soon as we flush /run to
264          * /var and close the runtime file. */
265
266         if (s->runtime_journal)
267                 return s->runtime_journal;
268
269         if (uid <= 0)
270                 return s->system_journal;
271
272         r = sd_id128_get_machine(&machine);
273         if (r < 0)
274                 return s->system_journal;
275
276         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
277         if (f)
278                 return f;
279
280         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
281                 return s->system_journal;
282
283         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
284                 /* Too many open? Then let's close one */
285                 f = hashmap_steal_first(s->user_journals);
286                 assert(f);
287                 journal_file_close(f);
288         }
289
290         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
291         free(p);
292
293         if (r < 0)
294                 return s->system_journal;
295
296         fix_perms(f, uid);
297         f->metrics = s->system_metrics;
298         f->compress = s->compress;
299
300         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
301         if (r < 0) {
302                 journal_file_close(f);
303                 return s->system_journal;
304         }
305
306         return f;
307 }
308
309 static void server_vacuum(Server *s) {
310         Iterator i;
311         void *k;
312         char *p;
313         char ids[33];
314         sd_id128_t machine;
315         int r;
316         JournalFile *f;
317
318         log_info("Rotating...");
319
320         if (s->runtime_journal) {
321                 r = journal_file_rotate(&s->runtime_journal);
322                 if (r < 0)
323                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
324         }
325
326         if (s->system_journal) {
327                 r = journal_file_rotate(&s->system_journal);
328                 if (r < 0)
329                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
330         }
331
332         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
333                 r = journal_file_rotate(&f);
334                 if (r < 0)
335                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
336                 else
337                         hashmap_replace(s->user_journals, k, f);
338         }
339
340         log_info("Vacuuming...");
341
342         r = sd_id128_get_machine(&machine);
343         if (r < 0) {
344                 log_error("Failed to get machine ID: %s", strerror(-r));
345                 return;
346         }
347
348         sd_id128_to_string(machine, ids);
349
350         if (s->system_journal) {
351                 if (asprintf(&p, "/var/log/journal/%s", ids) < 0) {
352                         log_error("Out of memory.");
353                         return;
354                 }
355
356                 r = journal_directory_vacuum(p, s->system_metrics.max_use, s->system_metrics.keep_free);
357                 if (r < 0 && r != -ENOENT)
358                         log_error("Failed to vacuum %s: %s", p, strerror(-r));
359                 free(p);
360         }
361
362
363         if (s->runtime_journal) {
364                 if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
365                         log_error("Out of memory.");
366                         return;
367                 }
368
369                 r = journal_directory_vacuum(p, s->runtime_metrics.max_use, s->runtime_metrics.keep_free);
370                 if (r < 0 && r != -ENOENT)
371                         log_error("Failed to vacuum %s: %s", p, strerror(-r));
372                 free(p);
373         }
374
375         s->cached_available_space_timestamp = 0;
376 }
377
378 static char *shortened_cgroup_path(pid_t pid) {
379         int r;
380         char *process_path, *init_path, *path;
381
382         assert(pid > 0);
383
384         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
385         if (r < 0)
386                 return NULL;
387
388         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
389         if (r < 0) {
390                 free(process_path);
391                 return NULL;
392         }
393
394         if (streq(init_path, "/"))
395                 init_path[0] = 0;
396
397         if (startswith(process_path, init_path)) {
398                 char *p;
399
400                 p = strdup(process_path + strlen(init_path));
401                 if (!p) {
402                         free(process_path);
403                         free(init_path);
404                         return NULL;
405                 }
406                 path = p;
407         } else {
408                 path = process_path;
409                 process_path = NULL;
410         }
411
412         free(process_path);
413         free(init_path);
414
415         return path;
416 }
417
418 static void dispatch_message_real(Server *s,
419                              struct iovec *iovec, unsigned n, unsigned m,
420                              struct ucred *ucred,
421                              struct timeval *tv) {
422
423         char *pid = NULL, *uid = NULL, *gid = NULL,
424                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
425                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
426                 *audit_session = NULL, *audit_loginuid = NULL,
427                 *exe = NULL, *cgroup = NULL, *session = NULL,
428                 *owner_uid = NULL, *service = NULL;
429
430         char idbuf[33];
431         sd_id128_t id;
432         int r;
433         char *t;
434         uid_t loginuid = 0, realuid = 0;
435         JournalFile *f;
436         bool vacuumed = false;
437
438         assert(s);
439         assert(iovec);
440         assert(n > 0);
441         assert(n + 16 <= m);
442
443         if (ucred) {
444                 uint32_t audit;
445                 uid_t owner;
446
447                 realuid = ucred->uid;
448
449                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
450                         IOVEC_SET_STRING(iovec[n++], pid);
451
452                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
453                         IOVEC_SET_STRING(iovec[n++], uid);
454
455                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
456                         IOVEC_SET_STRING(iovec[n++], gid);
457
458                 r = get_process_comm(ucred->pid, &t);
459                 if (r >= 0) {
460                         comm = strappend("_COMM=", t);
461                         free(t);
462
463                         if (comm)
464                                 IOVEC_SET_STRING(iovec[n++], comm);
465                 }
466
467                 r = get_process_exe(ucred->pid, &t);
468                 if (r >= 0) {
469                         exe = strappend("_EXE=", t);
470                         free(t);
471
472                         if (comm)
473                                 IOVEC_SET_STRING(iovec[n++], exe);
474                 }
475
476                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
477                 if (r >= 0) {
478                         cmdline = strappend("_CMDLINE=", t);
479                         free(t);
480
481                         if (cmdline)
482                                 IOVEC_SET_STRING(iovec[n++], cmdline);
483                 }
484
485                 r = audit_session_from_pid(ucred->pid, &audit);
486                 if (r >= 0)
487                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) audit) >= 0)
488                                 IOVEC_SET_STRING(iovec[n++], audit_session);
489
490                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
491                 if (r >= 0)
492                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
493                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
494
495                 t = shortened_cgroup_path(ucred->pid);
496                 if (t) {
497                         cgroup = strappend("_SYSTEMD_CGROUP=", t);
498                         free(t);
499
500                         if (cgroup)
501                                 IOVEC_SET_STRING(iovec[n++], cgroup);
502                 }
503
504                 if (sd_pid_get_session(ucred->pid, &t) >= 0) {
505                         session = strappend("_SYSTEMD_SESSION=", t);
506                         free(t);
507
508                         if (session)
509                                 IOVEC_SET_STRING(iovec[n++], session);
510                 }
511
512                 if (sd_pid_get_service(ucred->pid, &t) >= 0) {
513                         service = strappend("_SYSTEMD_SERVICE=", t);
514                         free(t);
515
516                         if (service)
517                                 IOVEC_SET_STRING(iovec[n++], service);
518                 }
519
520                 if (sd_pid_get_owner_uid(ucred->uid, &owner) >= 0)
521                         if (asprintf(&owner_uid, "_SYSTEMD_OWNER_UID=%lu", (unsigned long) owner) >= 0)
522                                 IOVEC_SET_STRING(iovec[n++], owner_uid);
523         }
524
525         if (tv) {
526                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
527                              (unsigned long long) timeval_load(tv)) >= 0)
528                         IOVEC_SET_STRING(iovec[n++], source_time);
529         }
530
531         /* Note that strictly speaking storing the boot id here is
532          * redundant since the entry includes this in-line
533          * anyway. However, we need this indexed, too. */
534         r = sd_id128_get_boot(&id);
535         if (r >= 0)
536                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
537                         IOVEC_SET_STRING(iovec[n++], boot_id);
538
539         r = sd_id128_get_machine(&id);
540         if (r >= 0)
541                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
542                         IOVEC_SET_STRING(iovec[n++], machine_id);
543
544         t = gethostname_malloc();
545         if (t) {
546                 hostname = strappend("_HOSTNAME=", t);
547                 free(t);
548                 if (hostname)
549                         IOVEC_SET_STRING(iovec[n++], hostname);
550         }
551
552         assert(n <= m);
553
554         server_flush_to_var(s);
555
556 retry:
557         f = find_journal(s, realuid == 0 ? 0 : loginuid);
558         if (!f)
559                 log_warning("Dropping message, as we can't find a place to store the data.");
560         else {
561                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
562
563                 if (r == -E2BIG && !vacuumed) {
564                         log_info("Allocation limit reached.");
565
566                         server_vacuum(s);
567                         vacuumed = true;
568
569                         log_info("Retrying write.");
570                         goto retry;
571                 }
572
573                 if (r < 0)
574                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
575         }
576
577         free(pid);
578         free(uid);
579         free(gid);
580         free(comm);
581         free(exe);
582         free(cmdline);
583         free(source_time);
584         free(boot_id);
585         free(machine_id);
586         free(hostname);
587         free(audit_session);
588         free(audit_loginuid);
589         free(cgroup);
590         free(session);
591         free(owner_uid);
592         free(service);
593 }
594
595 static void dispatch_message(Server *s,
596                              struct iovec *iovec, unsigned n, unsigned m,
597                              struct ucred *ucred,
598                              struct timeval *tv,
599                              int priority) {
600         int rl;
601         char *path = NULL, *c;
602
603         assert(s);
604         assert(iovec || n == 0);
605
606         if (n == 0)
607                 return;
608
609         if (!ucred)
610                 goto finish;
611
612         path = shortened_cgroup_path(ucred->pid);
613         if (!path)
614                 goto finish;
615
616         /* example: /user/lennart/3/foobar
617          *          /system/dbus.service/foobar
618          *
619          * So let's cut of everything past the third /, since that is
620          * wher user directories start */
621
622         c = strchr(path, '/');
623         if (c) {
624                 c = strchr(c+1, '/');
625                 if (c) {
626                         c = strchr(c+1, '/');
627                         if (c)
628                                 *c = 0;
629                 }
630         }
631
632         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
633
634         if (rl == 0) {
635                 free(path);
636                 return;
637         }
638
639         if (rl > 1) {
640                 int j = 0;
641                 char suppress_message[LINE_MAX];
642                 struct iovec suppress_iovec[18];
643
644                 /* Write a suppression message if we suppressed something */
645
646                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
647                 char_array_0(suppress_message);
648
649                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
650                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
651
652                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
653         }
654
655         free(path);
656
657 finish:
658         dispatch_message_real(s, iovec, n, m, ucred, tv);
659 }
660
661 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
662         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
663         struct iovec iovec[19];
664         unsigned n = 0;
665         int priority = LOG_USER | LOG_INFO;
666
667         assert(s);
668         assert(buf);
669
670         parse_syslog_priority((char**) &buf, &priority);
671         skip_syslog_date((char**) &buf);
672
673         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
674                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
675
676         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
677                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
678
679         message = strappend("MESSAGE=", buf);
680         if (message)
681                 IOVEC_SET_STRING(iovec[n++], message);
682
683         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
684
685         free(message);
686         free(syslog_facility);
687         free(syslog_priority);
688 }
689
690 static bool valid_user_field(const char *p, size_t l) {
691         const char *a;
692
693         /* We kinda enforce POSIX syntax recommendations for
694            environment variables here, but make a couple of additional
695            requirements.
696
697            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
698
699         /* No empty field names */
700         if (l <= 0)
701                 return false;
702
703         /* Don't allow names longer than 64 chars */
704         if (l > 64)
705                 return false;
706
707         /* Variables starting with an underscore are protected */
708         if (p[0] == '_')
709                 return false;
710
711         /* Don't allow digits as first character */
712         if (p[0] >= '0' && p[0] <= '9')
713                 return false;
714
715         /* Only allow A-Z0-9 and '_' */
716         for (a = p; a < p + l; a++)
717                 if (!((*a >= 'A' && *a <= 'Z') ||
718                       (*a >= '0' && *a <= '9') ||
719                       *a == '_'))
720                         return false;
721
722         return true;
723 }
724
725 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
726         struct iovec *iovec = NULL;
727         unsigned n = 0, m = 0, j;
728         const char *p;
729         size_t remaining;
730         int priority = LOG_INFO;
731
732         assert(s);
733         assert(buffer || n == 0);
734
735         p = buffer;
736         remaining = buffer_size;
737
738         while (remaining > 0) {
739                 const char *e, *q;
740
741                 e = memchr(p, '\n', remaining);
742
743                 if (!e) {
744                         /* Trailing noise, let's ignore it, and flush what we collected */
745                         log_debug("Received message with trailing noise, ignoring.");
746                         break;
747                 }
748
749                 if (e == p) {
750                         /* Entry separator */
751                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
752                         n = 0;
753                         priority = LOG_INFO;
754
755                         p++;
756                         remaining--;
757                         continue;
758                 }
759
760                 if (*p == '.' || *p == '#') {
761                         /* Ignore control commands for now, and
762                          * comments too. */
763                         remaining -= (e - p) + 1;
764                         p = e + 1;
765                         continue;
766                 }
767
768                 /* A property follows */
769
770                 if (n+16 >= m) {
771                         struct iovec *c;
772                         unsigned u;
773
774                         u = MAX((n+16U) * 2U, 4U);
775                         c = realloc(iovec, u * sizeof(struct iovec));
776                         if (!c) {
777                                 log_error("Out of memory");
778                                 break;
779                         }
780
781                         iovec = c;
782                         m = u;
783                 }
784
785                 q = memchr(p, '=', e - p);
786                 if (q) {
787                         if (valid_user_field(p, q - p)) {
788                                 /* If the field name starts with an
789                                  * underscore, skip the variable,
790                                  * since that indidates a trusted
791                                  * field */
792                                 iovec[n].iov_base = (char*) p;
793                                 iovec[n].iov_len = e - p;
794                                 n++;
795
796                                 /* We need to determine the priority
797                                  * of this entry for the rate limiting
798                                  * logic */
799                                 if (e - p == 10 &&
800                                     memcmp(p, "PRIORITY=", 10) == 0 &&
801                                     p[10] >= '0' &&
802                                     p[10] <= '9')
803                                         priority = p[10] - '0';
804                         }
805
806                         remaining -= (e - p) + 1;
807                         p = e + 1;
808                         continue;
809                 } else {
810                         uint64_t l;
811                         char *k;
812
813                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
814                                 log_debug("Failed to parse message, ignoring.");
815                                 break;
816                         }
817
818                         memcpy(&l, e + 1, sizeof(uint64_t));
819                         l = le64toh(l);
820
821                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
822                             e[1+sizeof(uint64_t)+l] != '\n') {
823                                 log_debug("Failed to parse message, ignoring.");
824                                 break;
825                         }
826
827                         k = malloc((e - p) + 1 + l);
828                         if (!k) {
829                                 log_error("Out of memory");
830                                 break;
831                         }
832
833                         memcpy(k, p, e - p);
834                         k[e - p] = '=';
835                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
836
837                         if (valid_user_field(p, e - p)) {
838                                 iovec[n].iov_base = k;
839                                 iovec[n].iov_len = (e - p) + 1 + l;
840                                 n++;
841                         } else
842                                 free(k);
843
844                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
845                         p = e + 1 + sizeof(uint64_t) + l + 1;
846                 }
847         }
848
849         dispatch_message(s, iovec, n, m, ucred, tv, priority);
850
851         for (j = 0; j < n; j++)
852                 if (iovec[j].iov_base < buffer ||
853                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
854                         free(iovec[j].iov_base);
855 }
856
857 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
858         struct iovec iovec[18];
859         char *message = NULL, *syslog_priority = NULL;
860         unsigned n = 0;
861         size_t tag_len;
862         int priority;
863
864         assert(s);
865         assert(p);
866
867         priority = s->priority;
868
869         if (s->priority_prefix &&
870             l > 3 &&
871             p[0] == '<' &&
872             p[1] >= '0' && p[1] <= '7' &&
873             p[2] == '>') {
874
875                 priority = p[1] - '0';
876                 p += 3;
877                 l -= 3;
878         }
879
880         if (l <= 0)
881                 return 0;
882
883         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
884                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
885
886         tag_len = s->tag ? strlen(s->tag) + 2: 0;
887         message = malloc(8 + tag_len + l);
888         if (message) {
889                 memcpy(message, "MESSAGE=", 8);
890
891                 if (s->tag) {
892                         memcpy(message+8, s->tag, tag_len-2);
893                         memcpy(message+8+tag_len-2, ": ", 2);
894                 }
895
896                 memcpy(message+8+tag_len, p, l);
897                 iovec[n].iov_base = message;
898                 iovec[n].iov_len = 8+tag_len+l;
899                 n++;
900         }
901
902         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
903
904         if (s->tee_console) {
905                 int console;
906
907                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
908                 if (console >= 0) {
909                         n = 0;
910                         if (s->tag) {
911                                 IOVEC_SET_STRING(iovec[n++], s->tag);
912                                 IOVEC_SET_STRING(iovec[n++], ": ");
913                         }
914
915                         iovec[n].iov_base = (void*) p;
916                         iovec[n].iov_len = l;
917                         n++;
918
919                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
920
921                         writev(console, iovec, n);
922                 }
923         }
924
925         free(message);
926         free(syslog_priority);
927
928         return 0;
929 }
930
931 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
932         assert(s);
933         assert(p);
934
935         while (l > 0 && strchr(WHITESPACE, *p)) {
936                 l--;
937                 p++;
938         }
939
940         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
941                 l--;
942
943         switch (s->state) {
944
945         case STDOUT_STREAM_TAG:
946
947                 if (l > 0) {
948                         s->tag = strndup(p, l);
949                         if (!s->tag) {
950                                 log_error("Out of memory");
951                                 return -EINVAL;
952                         }
953                 }
954
955                 s->state = STDOUT_STREAM_PRIORITY;
956                 return 0;
957
958         case STDOUT_STREAM_PRIORITY:
959                 if (l != 1 || *p < '0' || *p > '7') {
960                         log_warning("Failed to parse log priority line.");
961                         return -EINVAL;
962                 }
963
964                 s->priority = *p - '0';
965                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
966                 return 0;
967
968         case STDOUT_STREAM_PRIORITY_PREFIX:
969                 if (l != 1 || *p < '0' || *p > '1') {
970                         log_warning("Failed to parse priority prefix line.");
971                         return -EINVAL;
972                 }
973
974                 s->priority_prefix = *p - '0';
975                 s->state = STDOUT_STREAM_TEE_CONSOLE;
976                 return 0;
977
978         case STDOUT_STREAM_TEE_CONSOLE:
979                 if (l != 1 || *p < '0' || *p > '1') {
980                         log_warning("Failed to parse tee to console line.");
981                         return -EINVAL;
982                 }
983
984                 s->tee_console = *p - '0';
985                 s->state = STDOUT_STREAM_RUNNING;
986                 return 0;
987
988         case STDOUT_STREAM_RUNNING:
989                 return stdout_stream_log(s, p, l);
990         }
991
992         assert_not_reached("Unknown stream state");
993 }
994
995 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
996         char *p;
997         size_t remaining;
998         int r;
999
1000         assert(s);
1001
1002         p = s->buffer;
1003         remaining = s->length;
1004         for (;;) {
1005                 char *end;
1006                 size_t skip;
1007
1008                 end = memchr(p, '\n', remaining);
1009                 if (!end) {
1010                         if (remaining >= LINE_MAX) {
1011                                 end = p + LINE_MAX;
1012                                 skip = LINE_MAX;
1013                         } else
1014                                 break;
1015                 } else
1016                         skip = end - p + 1;
1017
1018                 r = stdout_stream_line(s, p, end - p);
1019                 if (r < 0)
1020                         return r;
1021
1022                 remaining -= skip;
1023                 p += skip;
1024         }
1025
1026         if (force_flush && remaining > 0) {
1027                 r = stdout_stream_line(s, p, remaining);
1028                 if (r < 0)
1029                         return r;
1030
1031                 p += remaining;
1032                 remaining = 0;
1033         }
1034
1035         if (p > s->buffer) {
1036                 memmove(s->buffer, p, remaining);
1037                 s->length = remaining;
1038         }
1039
1040         return 0;
1041 }
1042
1043 static int stdout_stream_process(StdoutStream *s) {
1044         ssize_t l;
1045         int r;
1046
1047         assert(s);
1048
1049         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
1050         if (l < 0) {
1051
1052                 if (errno == EAGAIN)
1053                         return 0;
1054
1055                 log_warning("Failed to read from stream: %m");
1056                 return -errno;
1057         }
1058
1059         if (l == 0) {
1060                 r = stdout_stream_scan(s, true);
1061                 if (r < 0)
1062                         return r;
1063
1064                 return 0;
1065         }
1066
1067         s->length += l;
1068         r = stdout_stream_scan(s, false);
1069         if (r < 0)
1070                 return r;
1071
1072         return 1;
1073
1074 }
1075
1076 static void stdout_stream_free(StdoutStream *s) {
1077         assert(s);
1078
1079         if (s->server) {
1080                 assert(s->server->n_stdout_streams > 0);
1081                 s->server->n_stdout_streams --;
1082                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1083         }
1084
1085         if (s->fd >= 0) {
1086                 if (s->server)
1087                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1088
1089                 close_nointr_nofail(s->fd);
1090         }
1091
1092         free(s->tag);
1093         free(s);
1094 }
1095
1096 static int stdout_stream_new(Server *s) {
1097         StdoutStream *stream;
1098         int fd, r;
1099         socklen_t len;
1100         struct epoll_event ev;
1101
1102         assert(s);
1103
1104         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1105         if (fd < 0) {
1106                 if (errno == EAGAIN)
1107                         return 0;
1108
1109                 log_error("Failed to accept stdout connection: %m");
1110                 return -errno;
1111         }
1112
1113         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1114                 log_warning("Too many stdout streams, refusing connection.");
1115                 close_nointr_nofail(fd);
1116                 return 0;
1117         }
1118
1119         stream = new0(StdoutStream, 1);
1120         if (!stream) {
1121                 log_error("Out of memory.");
1122                 close_nointr_nofail(fd);
1123                 return -ENOMEM;
1124         }
1125
1126         stream->fd = fd;
1127
1128         len = sizeof(stream->ucred);
1129         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1130                 log_error("Failed to determine peer credentials: %m");
1131                 r = -errno;
1132                 goto fail;
1133         }
1134
1135         if (shutdown(fd, SHUT_WR) < 0) {
1136                 log_error("Failed to shutdown writing side of socket: %m");
1137                 r = -errno;
1138                 goto fail;
1139         }
1140
1141         zero(ev);
1142         ev.data.ptr = stream;
1143         ev.events = EPOLLIN;
1144         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1145                 log_error("Failed to add stream to event loop: %m");
1146                 r = -errno;
1147                 goto fail;
1148         }
1149
1150         stream->server = s;
1151         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1152         s->n_stdout_streams ++;
1153
1154         return 0;
1155
1156 fail:
1157         stdout_stream_free(stream);
1158         return r;
1159 }
1160
1161 static int system_journal_open(Server *s) {
1162         int r;
1163         char *fn;
1164         sd_id128_t machine;
1165         char ids[33];
1166
1167         r = sd_id128_get_machine(&machine);
1168         if (r < 0)
1169                 return r;
1170
1171         sd_id128_to_string(machine, ids);
1172
1173         if (!s->system_journal) {
1174
1175                 /* First try to create the machine path, but not the prefix */
1176                 fn = strappend("/var/log/journal/", ids);
1177                 if (!fn)
1178                         return -ENOMEM;
1179                 (void) mkdir(fn, 0755);
1180                 free(fn);
1181
1182                 /* The create the system journal file */
1183                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1184                 if (!fn)
1185                         return -ENOMEM;
1186
1187                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1188                 free(fn);
1189
1190                 if (r >= 0) {
1191                         journal_default_metrics(&s->system_metrics, s->system_journal->fd);
1192
1193                         s->system_journal->metrics = s->system_metrics;
1194                         s->system_journal->compress = s->compress;
1195
1196                         fix_perms(s->system_journal, 0);
1197                 } else if (r < 0) {
1198
1199                         if (r != -ENOENT && r != -EROFS)
1200                                 log_warning("Failed to open system journal: %s", strerror(-r));
1201
1202                         r = 0;
1203                 }
1204         }
1205
1206         if (!s->runtime_journal) {
1207
1208                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1209                 if (!fn)
1210                         return -ENOMEM;
1211
1212                 if (s->system_journal) {
1213
1214                         /* Try to open the runtime journal, but only
1215                          * if it already exists, so that we can flush
1216                          * it into the system journal */
1217
1218                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1219                         free(fn);
1220
1221                         if (r < 0) {
1222                                 if (r != -ENOENT)
1223                                         log_warning("Failed to open runtime journal: %s", strerror(-r));
1224
1225                                 r = 0;
1226                         }
1227
1228                 } else {
1229
1230                         /* OK, we really need the runtime journal, so create
1231                          * it if necessary. */
1232
1233                         (void) mkdir_parents(fn, 0755);
1234                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1235                         free(fn);
1236
1237                         if (r < 0) {
1238                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1239                                 return r;
1240                         }
1241                 }
1242
1243                 if (s->runtime_journal) {
1244                         journal_default_metrics(&s->runtime_metrics, s->runtime_journal->fd);
1245
1246                         s->runtime_journal->metrics = s->runtime_metrics;
1247                         s->runtime_journal->compress = s->compress;
1248
1249                         fix_perms(s->runtime_journal, 0);
1250                 }
1251         }
1252
1253         return r;
1254 }
1255
1256 static int server_flush_to_var(Server *s) {
1257         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1258         Object *o = NULL;
1259         int r;
1260         sd_id128_t machine;
1261         sd_journal *j;
1262         usec_t ts;
1263
1264         assert(s);
1265
1266         if (!s->runtime_journal)
1267                 return 0;
1268
1269         ts = now(CLOCK_MONOTONIC);
1270         if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1271                 return 0;
1272
1273         s->var_available_timestamp = ts;
1274
1275         system_journal_open(s);
1276
1277         if (!s->system_journal)
1278                 return 0;
1279
1280         r = sd_id128_get_machine(&machine);
1281         if (r < 0) {
1282                 log_error("Failed to get machine id: %s", strerror(-r));
1283                 return r;
1284         }
1285
1286         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1287         if (r < 0) {
1288                 log_error("Failed to read runtime journal: %s", strerror(-r));
1289                 return r;
1290         }
1291
1292         SD_JOURNAL_FOREACH(j) {
1293                 JournalFile *f;
1294
1295                 f = j->current_file;
1296                 assert(f && f->current_offset > 0);
1297
1298                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1299                 if (r < 0) {
1300                         log_error("Can't read entry: %s", strerror(-r));
1301                         goto finish;
1302                 }
1303
1304                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1305                 if (r == -E2BIG) {
1306                         log_info("Allocation limit reached.");
1307
1308                         journal_file_post_change(s->system_journal);
1309                         server_vacuum(s);
1310
1311                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1312                 }
1313
1314                 if (r < 0) {
1315                         log_error("Can't write entry: %s", strerror(-r));
1316                         goto finish;
1317                 }
1318         }
1319
1320 finish:
1321         journal_file_post_change(s->system_journal);
1322
1323         journal_file_close(s->runtime_journal);
1324         s->runtime_journal = NULL;
1325
1326         if (r >= 0) {
1327                 sd_id128_to_string(machine, path + 17);
1328                 rm_rf(path, false, true, false);
1329         }
1330
1331         return r;
1332 }
1333
1334 static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1335         struct msghdr msghdr;
1336         struct iovec iovec;
1337         struct cmsghdr *cmsg;
1338         union {
1339                 struct cmsghdr cmsghdr;
1340                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1341                             CMSG_SPACE(sizeof(struct timeval))];
1342         } control;
1343         union sockaddr_union sa;
1344
1345         assert(s);
1346
1347         zero(msghdr);
1348
1349         zero(iovec);
1350         iovec.iov_base = (void*) buffer;
1351         iovec.iov_len = length;
1352         msghdr.msg_iov = &iovec;
1353         msghdr.msg_iovlen = 1;
1354
1355         zero(sa);
1356         sa.un.sun_family = AF_UNIX;
1357         strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1358         msghdr.msg_name = &sa;
1359         msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1360
1361         zero(control);
1362         msghdr.msg_control = &control;
1363         msghdr.msg_controllen = sizeof(control);
1364
1365         cmsg = CMSG_FIRSTHDR(&msghdr);
1366         cmsg->cmsg_level = SOL_SOCKET;
1367         cmsg->cmsg_type = SCM_CREDENTIALS;
1368         cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1369         memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1370         msghdr.msg_controllen = cmsg->cmsg_len;
1371
1372         /* Forward the syslog message we received via /dev/log to
1373          * /run/systemd/syslog. Unfortunately we currently can't set
1374          * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1375
1376         if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1377                 return;
1378
1379         if (errno == ESRCH) {
1380                 struct ucred u;
1381
1382                 /* Hmm, presumably the sender process vanished
1383                  * by now, so let's fix it as good as we
1384                  * can, and retry */
1385
1386                 u = *ucred;
1387                 u.pid = getpid();
1388                 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1389
1390                 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1391                         return;
1392         }
1393
1394         log_debug("Failed to forward syslog message: %m");
1395 }
1396
1397 static int process_event(Server *s, struct epoll_event *ev) {
1398         assert(s);
1399
1400         if (ev->data.fd == s->signal_fd) {
1401                 struct signalfd_siginfo sfsi;
1402                 ssize_t n;
1403
1404                 if (ev->events != EPOLLIN) {
1405                         log_info("Got invalid event from epoll.");
1406                         return -EIO;
1407                 }
1408
1409                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1410                 if (n != sizeof(sfsi)) {
1411
1412                         if (n >= 0)
1413                                 return -EIO;
1414
1415                         if (errno == EINTR || errno == EAGAIN)
1416                                 return 0;
1417
1418                         return -errno;
1419                 }
1420
1421                 if (sfsi.ssi_signo == SIGUSR1) {
1422                         server_flush_to_var(s);
1423                         return 0;
1424                 }
1425
1426                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1427                 return 0;
1428
1429         } else if (ev->data.fd == s->native_fd ||
1430                    ev->data.fd == s->syslog_fd) {
1431
1432                 if (ev->events != EPOLLIN) {
1433                         log_info("Got invalid event from epoll.");
1434                         return -EIO;
1435                 }
1436
1437                 for (;;) {
1438                         struct msghdr msghdr;
1439                         struct iovec iovec;
1440                         struct ucred *ucred = NULL;
1441                         struct timeval *tv = NULL;
1442                         struct cmsghdr *cmsg;
1443                         union {
1444                                 struct cmsghdr cmsghdr;
1445                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1446                                             CMSG_SPACE(sizeof(struct timeval))];
1447                         } control;
1448                         ssize_t n;
1449                         int v;
1450
1451                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1452                                 log_error("SIOCINQ failed: %m");
1453                                 return -errno;
1454                         }
1455
1456                         if (v <= 0)
1457                                 return 1;
1458
1459                         if (s->buffer_size < (size_t) v) {
1460                                 void *b;
1461                                 size_t l;
1462
1463                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1464                                 b = realloc(s->buffer, l+1);
1465
1466                                 if (!b) {
1467                                         log_error("Couldn't increase buffer.");
1468                                         return -ENOMEM;
1469                                 }
1470
1471                                 s->buffer_size = l;
1472                                 s->buffer = b;
1473                         }
1474
1475                         zero(iovec);
1476                         iovec.iov_base = s->buffer;
1477                         iovec.iov_len = s->buffer_size;
1478
1479                         zero(control);
1480                         zero(msghdr);
1481                         msghdr.msg_iov = &iovec;
1482                         msghdr.msg_iovlen = 1;
1483                         msghdr.msg_control = &control;
1484                         msghdr.msg_controllen = sizeof(control);
1485
1486                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1487                         if (n < 0) {
1488
1489                                 if (errno == EINTR || errno == EAGAIN)
1490                                         return 1;
1491
1492                                 log_error("recvmsg() failed: %m");
1493                                 return -errno;
1494                         }
1495
1496                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1497
1498                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1499                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1500                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1501                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1502                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1503                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1504                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1505                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1506                         }
1507
1508                         if (ev->data.fd == s->syslog_fd) {
1509                                 char *e;
1510
1511                                 e = memchr(s->buffer, '\n', n);
1512                                 if (e)
1513                                         *e = 0;
1514                                 else
1515                                         s->buffer[n] = 0;
1516
1517                                 forward_syslog(s, s->buffer, n, ucred, tv);
1518                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1519                         } else
1520                                 process_native_message(s, s->buffer, n, ucred, tv);
1521                 }
1522
1523                 return 1;
1524
1525         } else if (ev->data.fd == s->stdout_fd) {
1526
1527                 if (ev->events != EPOLLIN) {
1528                         log_info("Got invalid event from epoll.");
1529                         return -EIO;
1530                 }
1531
1532                 stdout_stream_new(s);
1533                 return 1;
1534
1535         } else {
1536                 StdoutStream *stream;
1537
1538                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1539                         log_info("Got invalid event from epoll.");
1540                         return -EIO;
1541                 }
1542
1543                 /* If it is none of the well-known fds, it must be an
1544                  * stdout stream fd. Note that this is a bit ugly here
1545                  * (since we rely that none of the well-known fds
1546                  * could be interpreted as pointer), but nonetheless
1547                  * safe, since the well-known fds would never get an
1548                  * fd > 4096, i.e. beyond the first memory page */
1549
1550                 stream = ev->data.ptr;
1551
1552                 if (stdout_stream_process(stream) <= 0)
1553                         stdout_stream_free(stream);
1554
1555                 return 1;
1556         }
1557
1558         log_error("Unknown event.");
1559         return 0;
1560 }
1561
1562 static int open_syslog_socket(Server *s) {
1563         union sockaddr_union sa;
1564         int one, r;
1565         struct epoll_event ev;
1566         struct timeval tv;
1567
1568         assert(s);
1569
1570         if (s->syslog_fd < 0) {
1571
1572                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1573                 if (s->syslog_fd < 0) {
1574                         log_error("socket() failed: %m");
1575                         return -errno;
1576                 }
1577
1578                 zero(sa);
1579                 sa.un.sun_family = AF_UNIX;
1580                 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
1581
1582                 unlink(sa.un.sun_path);
1583
1584                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1585                 if (r < 0) {
1586                         log_error("bind() failed: %m");
1587                         return -errno;
1588                 }
1589
1590                 chmod(sa.un.sun_path, 0666);
1591         }
1592
1593         one = 1;
1594         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1595         if (r < 0) {
1596                 log_error("SO_PASSCRED failed: %m");
1597                 return -errno;
1598         }
1599
1600         one = 1;
1601         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1602         if (r < 0) {
1603                 log_error("SO_TIMESTAMP failed: %m");
1604                 return -errno;
1605         }
1606
1607         /* Since we use the same socket for forwarding this to some
1608          * other syslog implementation, make sure we don't hang
1609          * forever */
1610         timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1611         if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1612                 log_error("SO_SNDTIMEO failed: %m");
1613                 return -errno;
1614         }
1615
1616         zero(ev);
1617         ev.events = EPOLLIN;
1618         ev.data.fd = s->syslog_fd;
1619         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1620                 log_error("Failed to add syslog server fd to epoll object: %m");
1621                 return -errno;
1622         }
1623
1624         return 0;
1625 }
1626
1627 static int open_native_socket(Server*s) {
1628         union sockaddr_union sa;
1629         int one, r;
1630         struct epoll_event ev;
1631
1632         assert(s);
1633
1634         if (s->native_fd < 0) {
1635
1636                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1637                 if (s->native_fd < 0) {
1638                         log_error("socket() failed: %m");
1639                         return -errno;
1640                 }
1641
1642                 zero(sa);
1643                 sa.un.sun_family = AF_UNIX;
1644                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1645
1646                 unlink(sa.un.sun_path);
1647
1648                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1649                 if (r < 0) {
1650                         log_error("bind() failed: %m");
1651                         return -errno;
1652                 }
1653
1654                 chmod(sa.un.sun_path, 0666);
1655         }
1656
1657         one = 1;
1658         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1659         if (r < 0) {
1660                 log_error("SO_PASSCRED failed: %m");
1661                 return -errno;
1662         }
1663
1664         one = 1;
1665         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1666         if (r < 0) {
1667                 log_error("SO_TIMESTAMP failed: %m");
1668                 return -errno;
1669         }
1670
1671         zero(ev);
1672         ev.events = EPOLLIN;
1673         ev.data.fd = s->native_fd;
1674         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1675                 log_error("Failed to add native server fd to epoll object: %m");
1676                 return -errno;
1677         }
1678
1679         return 0;
1680 }
1681
1682 static int open_stdout_socket(Server *s) {
1683         union sockaddr_union sa;
1684         int r;
1685         struct epoll_event ev;
1686
1687         assert(s);
1688
1689         if (s->stdout_fd < 0) {
1690
1691                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1692                 if (s->stdout_fd < 0) {
1693                         log_error("socket() failed: %m");
1694                         return -errno;
1695                 }
1696
1697                 zero(sa);
1698                 sa.un.sun_family = AF_UNIX;
1699                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1700
1701                 unlink(sa.un.sun_path);
1702
1703                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1704                 if (r < 0) {
1705                         log_error("bind() failed: %m");
1706                         return -errno;
1707                 }
1708
1709                 chmod(sa.un.sun_path, 0666);
1710
1711                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1712                         log_error("liste() failed: %m");
1713                         return -errno;
1714                 }
1715         }
1716
1717         zero(ev);
1718         ev.events = EPOLLIN;
1719         ev.data.fd = s->stdout_fd;
1720         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1721                 log_error("Failed to add stdout server fd to epoll object: %m");
1722                 return -errno;
1723         }
1724
1725         return 0;
1726 }
1727
1728 static int open_signalfd(Server *s) {
1729         sigset_t mask;
1730         struct epoll_event ev;
1731
1732         assert(s);
1733
1734         assert_se(sigemptyset(&mask) == 0);
1735         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1736         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1737
1738         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1739         if (s->signal_fd < 0) {
1740                 log_error("signalfd(): %m");
1741                 return -errno;
1742         }
1743
1744         zero(ev);
1745         ev.events = EPOLLIN;
1746         ev.data.fd = s->signal_fd;
1747
1748         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1749                 log_error("epoll_ctl(): %m");
1750                 return -errno;
1751         }
1752
1753         return 0;
1754 }
1755
1756 static int server_init(Server *s) {
1757         int n, r, fd;
1758
1759         assert(s);
1760
1761         zero(*s);
1762         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1763         s->compress = true;
1764
1765         memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics));
1766         memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics));
1767
1768         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1769         if (!s->user_journals) {
1770                 log_error("Out of memory.");
1771                 return -ENOMEM;
1772         }
1773
1774         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1775         if (s->epoll_fd < 0) {
1776                 log_error("Failed to create epoll object: %m");
1777                 return -errno;
1778         }
1779
1780         n = sd_listen_fds(true);
1781         if (n < 0) {
1782                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1783                 return n;
1784         }
1785
1786         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1787
1788                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1789
1790                         if (s->native_fd >= 0) {
1791                                 log_error("Too many native sockets passed.");
1792                                 return -EINVAL;
1793                         }
1794
1795                         s->native_fd = fd;
1796
1797                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1798
1799                         if (s->stdout_fd >= 0) {
1800                                 log_error("Too many stdout sockets passed.");
1801                                 return -EINVAL;
1802                         }
1803
1804                         s->stdout_fd = fd;
1805
1806                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1807
1808                         if (s->syslog_fd >= 0) {
1809                                 log_error("Too many /dev/log sockets passed.");
1810                                 return -EINVAL;
1811                         }
1812
1813                         s->syslog_fd = fd;
1814
1815                 } else {
1816                         log_error("Unknown socket passed.");
1817                         return -EINVAL;
1818                 }
1819         }
1820
1821         r = open_syslog_socket(s);
1822         if (r < 0)
1823                 return r;
1824
1825         r = open_native_socket(s);
1826         if (r < 0)
1827                 return r;
1828
1829         r = open_stdout_socket(s);
1830         if (r < 0)
1831                 return r;
1832
1833         r = system_journal_open(s);
1834         if (r < 0)
1835                 return r;
1836
1837         r = open_signalfd(s);
1838         if (r < 0)
1839                 return r;
1840
1841         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1842         if (!s->rate_limit)
1843                 return -ENOMEM;
1844
1845         return 0;
1846 }
1847
1848 static void server_done(Server *s) {
1849         JournalFile *f;
1850         assert(s);
1851
1852         while (s->stdout_streams)
1853                 stdout_stream_free(s->stdout_streams);
1854
1855         if (s->system_journal)
1856                 journal_file_close(s->system_journal);
1857
1858         if (s->runtime_journal)
1859                 journal_file_close(s->runtime_journal);
1860
1861         while ((f = hashmap_steal_first(s->user_journals)))
1862                 journal_file_close(f);
1863
1864         hashmap_free(s->user_journals);
1865
1866         if (s->epoll_fd >= 0)
1867                 close_nointr_nofail(s->epoll_fd);
1868
1869         if (s->signal_fd >= 0)
1870                 close_nointr_nofail(s->signal_fd);
1871
1872         if (s->syslog_fd >= 0)
1873                 close_nointr_nofail(s->syslog_fd);
1874
1875         if (s->native_fd >= 0)
1876                 close_nointr_nofail(s->native_fd);
1877
1878         if (s->stdout_fd >= 0)
1879                 close_nointr_nofail(s->stdout_fd);
1880
1881         if (s->rate_limit)
1882                 journal_rate_limit_free(s->rate_limit);
1883
1884         free(s->buffer);
1885 }
1886
1887 int main(int argc, char *argv[]) {
1888         Server server;
1889         int r;
1890
1891         /* if (getppid() != 1) { */
1892         /*         log_error("This program should be invoked by init only."); */
1893         /*         return EXIT_FAILURE; */
1894         /* } */
1895
1896         if (argc > 1) {
1897                 log_error("This program does not take arguments.");
1898                 return EXIT_FAILURE;
1899         }
1900
1901         log_set_target(LOG_TARGET_CONSOLE);
1902         log_parse_environment();
1903         log_open();
1904
1905         umask(0022);
1906
1907         r = server_init(&server);
1908         if (r < 0)
1909                 goto finish;
1910
1911         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1912
1913         sd_notify(false,
1914                   "READY=1\n"
1915                   "STATUS=Processing requests...");
1916
1917         server_vacuum(&server);
1918         server_flush_to_var(&server);
1919
1920         for (;;) {
1921                 struct epoll_event event;
1922
1923                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1924                 if (r < 0) {
1925
1926                         if (errno == EINTR)
1927                                 continue;
1928
1929                         log_error("epoll_wait() failed: %m");
1930                         r = -errno;
1931                         goto finish;
1932                 } else if (r == 0)
1933                         break;
1934
1935                 r = process_event(&server, &event);
1936                 if (r < 0)
1937                         goto finish;
1938                 else if (r == 0)
1939                         break;
1940         }
1941
1942         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1943
1944 finish:
1945         sd_notify(false,
1946                   "STATUS=Shutting down...");
1947
1948         server_done(&server);
1949
1950         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1951 }