chiark / gitweb /
journald: add _SYSTEMD_SESSION, _SYSTEMD_OWNER_UID, _SYSTEMD_SERVICE to all entries
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "sd-login.h"
45 #include "journal-internal.h"
46
47 #define USER_JOURNALS_MAX 1024
48 #define STDOUT_STREAMS_MAX 4096
49
50 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
51 #define DEFAULT_RATE_LIMIT_BURST 200
52
53 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
54
55 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
56
57 #define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
58
59 typedef struct StdoutStream StdoutStream;
60
61 typedef struct Server {
62         int epoll_fd;
63         int signal_fd;
64         int syslog_fd;
65         int native_fd;
66         int stdout_fd;
67
68         JournalFile *runtime_journal;
69         JournalFile *system_journal;
70         Hashmap *user_journals;
71
72         uint64_t seqnum;
73
74         char *buffer;
75         size_t buffer_size;
76
77         JournalRateLimit *rate_limit;
78
79         JournalMetrics runtime_metrics;
80         JournalMetrics system_metrics;
81
82         bool compress;
83
84         uint64_t cached_available_space;
85         usec_t cached_available_space_timestamp;
86
87         uint64_t var_available_timestamp;
88
89         LIST_HEAD(StdoutStream, stdout_streams);
90         unsigned n_stdout_streams;
91 } Server;
92
93 typedef enum StdoutStreamState {
94         STDOUT_STREAM_TAG,
95         STDOUT_STREAM_PRIORITY,
96         STDOUT_STREAM_PRIORITY_PREFIX,
97         STDOUT_STREAM_TEE_CONSOLE,
98         STDOUT_STREAM_RUNNING
99 } StdoutStreamState;
100
101 struct StdoutStream {
102         Server *server;
103         StdoutStreamState state;
104
105         int fd;
106
107         struct ucred ucred;
108
109         char *tag;
110         int priority;
111         bool priority_prefix:1;
112         bool tee_console:1;
113
114         char buffer[LINE_MAX+1];
115         size_t length;
116
117         LIST_FIELDS(StdoutStream, stdout_stream);
118 };
119
120 static int server_flush_to_var(Server *s);
121
122 static uint64_t available_space(Server *s) {
123         char ids[33], *p;
124         const char *f;
125         sd_id128_t machine;
126         struct statvfs ss;
127         uint64_t sum = 0, avail = 0, ss_avail = 0;
128         int r;
129         DIR *d;
130         usec_t ts;
131         JournalMetrics *m;
132
133         ts = now(CLOCK_MONOTONIC);
134
135         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
136                 return s->cached_available_space;
137
138         r = sd_id128_get_machine(&machine);
139         if (r < 0)
140                 return 0;
141
142         if (s->system_journal) {
143                 f = "/var/log/journal/";
144                 m = &s->system_metrics;
145         } else {
146                 f = "/run/log/journal/";
147                 m = &s->runtime_metrics;
148         }
149
150         assert(m);
151
152         p = strappend(f, sd_id128_to_string(machine, ids));
153         if (!p)
154                 return 0;
155
156         d = opendir(p);
157         free(p);
158
159         if (!d)
160                 return 0;
161
162         if (fstatvfs(dirfd(d), &ss) < 0)
163                 goto finish;
164
165         for (;;) {
166                 struct stat st;
167                 struct dirent buf, *de;
168                 int k;
169
170                 k = readdir_r(d, &buf, &de);
171                 if (k != 0) {
172                         r = -k;
173                         goto finish;
174                 }
175
176                 if (!de)
177                         break;
178
179                 if (!dirent_is_file_with_suffix(de, ".journal"))
180                         continue;
181
182                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
183                         continue;
184
185                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
186         }
187
188         avail = sum >= m->max_use ? 0 : m->max_use - sum;
189
190         ss_avail = ss.f_bsize * ss.f_bavail;
191
192         ss_avail = ss_avail < m->keep_free ? 0 : ss_avail - m->keep_free;
193
194         if (ss_avail < avail)
195                 avail = ss_avail;
196
197         s->cached_available_space = avail;
198         s->cached_available_space_timestamp = ts;
199
200 finish:
201         closedir(d);
202
203         return avail;
204 }
205
206 static void fix_perms(JournalFile *f, uid_t uid) {
207         acl_t acl;
208         acl_entry_t entry;
209         acl_permset_t permset;
210         int r;
211
212         assert(f);
213
214         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
215         if (r < 0)
216                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
217
218         if (uid <= 0)
219                 return;
220
221         acl = acl_get_fd(f->fd);
222         if (!acl) {
223                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
224                 return;
225         }
226
227         r = acl_find_uid(acl, uid, &entry);
228         if (r <= 0) {
229
230                 if (acl_create_entry(&acl, &entry) < 0 ||
231                     acl_set_tag_type(entry, ACL_USER) < 0 ||
232                     acl_set_qualifier(entry, &uid) < 0) {
233                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
234                         goto finish;
235                 }
236         }
237
238         if (acl_get_permset(entry, &permset) < 0 ||
239             acl_add_perm(permset, ACL_READ) < 0 ||
240             acl_calc_mask(&acl) < 0) {
241                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
242                 goto finish;
243         }
244
245         if (acl_set_fd(f->fd, acl) < 0)
246                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
247
248 finish:
249         acl_free(acl);
250 }
251
252 static JournalFile* find_journal(Server *s, uid_t uid) {
253         char *p;
254         int r;
255         JournalFile *f;
256         char ids[33];
257         sd_id128_t machine;
258
259         assert(s);
260
261         /* We split up user logs only on /var, not on /run. If the
262          * runtime file is open, we write to it exclusively, in order
263          * to guarantee proper order as soon as we flush /run to
264          * /var and close the runtime file. */
265
266         if (s->runtime_journal)
267                 return s->runtime_journal;
268
269         if (uid <= 0)
270                 return s->system_journal;
271
272         r = sd_id128_get_machine(&machine);
273         if (r < 0)
274                 return s->system_journal;
275
276         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
277         if (f)
278                 return f;
279
280         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
281                 return s->system_journal;
282
283         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
284                 /* Too many open? Then let's close one */
285                 f = hashmap_steal_first(s->user_journals);
286                 assert(f);
287                 journal_file_close(f);
288         }
289
290         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
291         free(p);
292
293         if (r < 0)
294                 return s->system_journal;
295
296         fix_perms(f, uid);
297         f->metrics = s->system_metrics;
298         f->compress = s->compress;
299
300         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
301         if (r < 0) {
302                 journal_file_close(f);
303                 return s->system_journal;
304         }
305
306         return f;
307 }
308
309 static void server_vacuum(Server *s) {
310         Iterator i;
311         void *k;
312         char *p;
313         char ids[33];
314         sd_id128_t machine;
315         int r;
316         JournalFile *f;
317
318         log_info("Rotating...");
319
320         if (s->runtime_journal) {
321                 r = journal_file_rotate(&s->runtime_journal);
322                 if (r < 0)
323                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
324         }
325
326         if (s->system_journal) {
327                 r = journal_file_rotate(&s->system_journal);
328                 if (r < 0)
329                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
330         }
331
332         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
333                 r = journal_file_rotate(&f);
334                 if (r < 0)
335                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
336                 else
337                         hashmap_replace(s->user_journals, k, f);
338         }
339
340         log_info("Vacuuming...");
341
342         r = sd_id128_get_machine(&machine);
343         if (r < 0) {
344                 log_error("Failed to get machine ID: %s", strerror(-r));
345                 return;
346         }
347
348         sd_id128_to_string(machine, ids);
349
350         if (s->system_journal) {
351                 if (asprintf(&p, "/var/log/journal/%s", ids) < 0) {
352                         log_error("Out of memory.");
353                         return;
354                 }
355
356                 r = journal_directory_vacuum(p, s->system_metrics.max_use, s->system_metrics.keep_free);
357                 if (r < 0 && r != -ENOENT)
358                         log_error("Failed to vacuum %s: %s", p, strerror(-r));
359                 free(p);
360         }
361
362
363         if (s->runtime_journal) {
364                 if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
365                         log_error("Out of memory.");
366                         return;
367                 }
368
369                 r = journal_directory_vacuum(p, s->runtime_metrics.max_use, s->runtime_metrics.keep_free);
370                 if (r < 0 && r != -ENOENT)
371                         log_error("Failed to vacuum %s: %s", p, strerror(-r));
372                 free(p);
373         }
374
375         s->cached_available_space_timestamp = 0;
376 }
377
378 static char *shortened_cgroup_path(pid_t pid) {
379         int r;
380         char *process_path, *init_path, *path;
381
382         assert(pid > 0);
383
384         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
385         if (r < 0)
386                 return NULL;
387
388         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
389         if (r < 0) {
390                 free(process_path);
391                 return NULL;
392         }
393
394         if (streq(init_path, "/"))
395                 init_path[0] = 0;
396
397         if (startswith(process_path, init_path)) {
398                 char *p;
399
400                 p = strdup(process_path + strlen(init_path));
401                 if (!p) {
402                         free(process_path);
403                         free(init_path);
404                         return NULL;
405                 }
406                 path = p;
407         } else {
408                 path = process_path;
409                 process_path = NULL;
410         }
411
412         free(process_path);
413         free(init_path);
414
415         return path;
416 }
417
418 static void dispatch_message_real(Server *s,
419                              struct iovec *iovec, unsigned n, unsigned m,
420                              struct ucred *ucred,
421                              struct timeval *tv) {
422
423         char *pid = NULL, *uid = NULL, *gid = NULL,
424                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
425                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
426                 *audit_session = NULL, *audit_loginuid = NULL,
427                 *exe = NULL, *cgroup = NULL, *session = NULL,
428                 *owner_uid = NULL, *service = NULL;
429
430         char idbuf[33];
431         sd_id128_t id;
432         int r;
433         char *t;
434         uid_t loginuid = 0, realuid = 0;
435         JournalFile *f;
436         bool vacuumed = false;
437
438         assert(s);
439         assert(iovec);
440         assert(n > 0);
441         assert(n + 16 <= m);
442
443         if (ucred) {
444                 uint32_t audit;
445                 uid_t owner;
446
447                 realuid = ucred->uid;
448
449                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
450                         IOVEC_SET_STRING(iovec[n++], pid);
451
452                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
453                         IOVEC_SET_STRING(iovec[n++], uid);
454
455                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
456                         IOVEC_SET_STRING(iovec[n++], gid);
457
458                 r = get_process_comm(ucred->pid, &t);
459                 if (r >= 0) {
460                         comm = strappend("_COMM=", t);
461                         free(t);
462
463                         if (comm)
464                                 IOVEC_SET_STRING(iovec[n++], comm);
465                 }
466
467                 r = get_process_exe(ucred->pid, &t);
468                 if (r >= 0) {
469                         exe = strappend("_EXE=", t);
470                         free(t);
471
472                         if (comm)
473                                 IOVEC_SET_STRING(iovec[n++], exe);
474                 }
475
476                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
477                 if (r >= 0) {
478                         cmdline = strappend("_CMDLINE=", t);
479                         free(t);
480
481                         if (cmdline)
482                                 IOVEC_SET_STRING(iovec[n++], cmdline);
483                 }
484
485                 r = audit_session_from_pid(ucred->pid, &audit);
486                 if (r >= 0)
487                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) audit) >= 0)
488                                 IOVEC_SET_STRING(iovec[n++], audit_session);
489
490                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
491                 if (r >= 0)
492                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
493                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
494
495                 t = shortened_cgroup_path(ucred->pid);
496                 if (t) {
497                         cgroup = strappend("_SYSTEMD_CGROUP=", t);
498                         free(t);
499
500                         if (cgroup)
501                                 IOVEC_SET_STRING(iovec[n++], cgroup);
502                 }
503
504                 if (sd_pid_get_session(ucred->pid, &t) >= 0) {
505                         session = strappend("_SYSTEMD_SESSION=", t);
506                         free(t);
507
508                         if (session)
509                                 IOVEC_SET_STRING(iovec[n++], session);
510                 }
511
512                 if (sd_pid_get_service(ucred->pid, &t) >= 0) {
513                         service = strappend("_SYSTEMD_SERVICE=", t);
514                         free(t);
515
516                         if (service)
517                                 IOVEC_SET_STRING(iovec[n++], service);
518                 }
519
520                 if (sd_pid_get_owner_uid(ucred->uid, &owner) >= 0)
521                         if (asprintf(&owner_uid, "_SYSTEMD_OWNER_UID=%lu", (unsigned long) owner) >= 0)
522                                 IOVEC_SET_STRING(iovec[n++], owner_uid);
523         }
524
525         if (tv) {
526                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
527                              (unsigned long long) timeval_load(tv)) >= 0)
528                         IOVEC_SET_STRING(iovec[n++], source_time);
529         }
530
531         /* Note that strictly speaking storing the boot id here is
532          * redundant since the entry includes this in-line
533          * anyway. However, we need this indexed, too. */
534         r = sd_id128_get_boot(&id);
535         if (r >= 0)
536                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
537                         IOVEC_SET_STRING(iovec[n++], boot_id);
538
539         r = sd_id128_get_machine(&id);
540         if (r >= 0)
541                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
542                         IOVEC_SET_STRING(iovec[n++], machine_id);
543
544         t = gethostname_malloc();
545         if (t) {
546                 hostname = strappend("_HOSTNAME=", t);
547                 free(t);
548                 if (hostname)
549                         IOVEC_SET_STRING(iovec[n++], hostname);
550         }
551
552         assert(n <= m);
553
554         server_flush_to_var(s);
555
556 retry:
557         f = find_journal(s, realuid == 0 ? 0 : loginuid);
558         if (!f)
559                 log_warning("Dropping message, as we can't find a place to store the data.");
560         else {
561                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
562
563                 if (r == -E2BIG && !vacuumed) {
564                         log_info("Allocation limit reached.");
565
566                         server_vacuum(s);
567                         vacuumed = true;
568
569                         log_info("Retrying write.");
570                         goto retry;
571                 }
572
573                 if (r < 0)
574                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
575         }
576
577         free(pid);
578         free(uid);
579         free(gid);
580         free(comm);
581         free(exe);
582         free(cmdline);
583         free(source_time);
584         free(boot_id);
585         free(machine_id);
586         free(hostname);
587         free(audit_session);
588         free(audit_loginuid);
589         free(cgroup);
590         free(session);
591         free(owner_uid);
592         free(service);
593 }
594
595 static void dispatch_message(Server *s,
596                              struct iovec *iovec, unsigned n, unsigned m,
597                              struct ucred *ucred,
598                              struct timeval *tv,
599                              int priority) {
600         int rl;
601         char *path = NULL, *c;
602
603         assert(s);
604         assert(iovec || n == 0);
605
606         if (n == 0)
607                 return;
608
609         if (!ucred)
610                 goto finish;
611
612         path = shortened_cgroup_path(ucred->pid);
613         if (!path)
614                 goto finish;
615
616         /* example: /user/lennart/3/foobar
617          *          /system/dbus.service/foobar
618          *
619          * So let's cut of everything past the third /, since that is
620          * wher user directories start */
621
622         c = strchr(path, '/');
623         if (c) {
624                 c = strchr(c+1, '/');
625                 if (c) {
626                         c = strchr(c+1, '/');
627                         if (c)
628                                 *c = 0;
629                 }
630         }
631
632         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
633
634         if (rl == 0) {
635                 free(path);
636                 return;
637         }
638
639         if (rl > 1) {
640                 int j = 0;
641                 char suppress_message[LINE_MAX];
642                 struct iovec suppress_iovec[18];
643
644                 /* Write a suppression message if we suppressed something */
645
646                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
647                 char_array_0(suppress_message);
648
649                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
650                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
651
652                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
653         }
654
655         free(path);
656
657 finish:
658         dispatch_message_real(s, iovec, n, m, ucred, tv);
659 }
660
661 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
662         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
663         struct iovec iovec[19];
664         unsigned n = 0;
665         int priority = LOG_USER | LOG_INFO;
666
667         assert(s);
668         assert(buf);
669
670         parse_syslog_priority((char**) &buf, &priority);
671         skip_syslog_date((char**) &buf);
672
673         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
674                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
675
676         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
677                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
678
679         message = strappend("MESSAGE=", buf);
680         if (message)
681                 IOVEC_SET_STRING(iovec[n++], message);
682
683         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
684
685         free(message);
686         free(syslog_facility);
687         free(syslog_priority);
688 }
689
690 static bool valid_user_field(const char *p, size_t l) {
691         const char *a;
692
693         /* We kinda enforce POSIX syntax recommendations for
694            environment variables here, but make a couple of additional
695            requirements.
696
697            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
698
699         /* No empty field names */
700         if (l <= 0)
701                 return false;
702
703         /* Don't allow names longer than 64 chars */
704         if (l > 64)
705                 return false;
706
707         /* Variables starting with an underscore are protected */
708         if (p[0] == '_')
709                 return false;
710
711         /* Don't allow digits as first character */
712         if (p[0] >= '0' && p[0] <= '9')
713                 return false;
714
715         /* Only allow A-Z0-9 and '_' */
716         for (a = p; a < p + l; a++)
717                 if (!((*a >= 'A' && *a <= 'Z') ||
718                       (*a >= '0' && *a <= '9') ||
719                       *a == '_'))
720                         return false;
721
722         return true;
723 }
724
725 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
726         struct iovec *iovec = NULL;
727         unsigned n = 0, m = 0, j;
728         const char *p;
729         size_t remaining;
730         int priority = LOG_INFO;
731
732         assert(s);
733         assert(buffer || n == 0);
734
735         p = buffer;
736         remaining = buffer_size;
737
738         while (remaining > 0) {
739                 const char *e, *q;
740
741                 e = memchr(p, '\n', remaining);
742
743                 if (!e) {
744                         /* Trailing noise, let's ignore it, and flush what we collected */
745                         log_debug("Received message with trailing noise, ignoring.");
746                         break;
747                 }
748
749                 if (e == p) {
750                         /* Entry separator */
751                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
752                         n = 0;
753                         priority = LOG_INFO;
754
755                         p++;
756                         remaining--;
757                         continue;
758                 }
759
760                 if (*p == '.' || *p == '#') {
761                         /* Ignore control commands for now, and
762                          * comments too. */
763                         remaining -= (e - p) + 1;
764                         p = e + 1;
765                         continue;
766                 }
767
768                 /* A property follows */
769
770                 if (n+16 >= m) {
771                         struct iovec *c;
772                         unsigned u;
773
774                         u = MAX((n+16U) * 2U, 4U);
775                         c = realloc(iovec, u * sizeof(struct iovec));
776                         if (!c) {
777                                 log_error("Out of memory");
778                                 break;
779                         }
780
781                         iovec = c;
782                         m = u;
783                 }
784
785                 q = memchr(p, '=', e - p);
786                 if (q) {
787                         if (valid_user_field(p, q - p)) {
788                                 /* If the field name starts with an
789                                  * underscore, skip the variable,
790                                  * since that indidates a trusted
791                                  * field */
792                                 iovec[n].iov_base = (char*) p;
793                                 iovec[n].iov_len = e - p;
794                                 n++;
795
796                                 /* We need to determine the priority
797                                  * of this entry for the rate limiting
798                                  * logic */
799                                 if (e - p == 10 &&
800                                     memcmp(p, "PRIORITY=", 10) == 0 &&
801                                     p[10] >= '0' &&
802                                     p[10] <= '9')
803                                         priority = p[10] - '0';
804                         }
805
806                         remaining -= (e - p) + 1;
807                         p = e + 1;
808                         continue;
809                 } else {
810                         uint64_t l;
811                         char *k;
812
813                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
814                                 log_debug("Failed to parse message, ignoring.");
815                                 break;
816                         }
817
818                         memcpy(&l, e + 1, sizeof(uint64_t));
819                         l = le64toh(l);
820
821                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
822                             e[1+sizeof(uint64_t)+l] != '\n') {
823                                 log_debug("Failed to parse message, ignoring.");
824                                 break;
825                         }
826
827                         k = malloc((e - p) + 1 + l);
828                         if (!k) {
829                                 log_error("Out of memory");
830                                 break;
831                         }
832
833                         memcpy(k, p, e - p);
834                         k[e - p] = '=';
835                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
836
837                         if (valid_user_field(p, e - p)) {
838                                 iovec[n].iov_base = k;
839                                 iovec[n].iov_len = (e - p) + 1 + l;
840                                 n++;
841                         } else
842                                 free(k);
843
844                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
845                         p = e + 1 + sizeof(uint64_t) + l + 1;
846                 }
847         }
848
849         dispatch_message(s, iovec, n, m, ucred, tv, priority);
850
851         for (j = 0; j < n; j++)
852                 if (iovec[j].iov_base < buffer ||
853                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
854                         free(iovec[j].iov_base);
855 }
856
857 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
858         struct iovec iovec[18];
859         char *message = NULL, *syslog_priority = NULL;
860         unsigned n = 0;
861         size_t tag_len;
862         int priority;
863
864         assert(s);
865         assert(p);
866
867         priority = s->priority;
868
869         if (s->priority_prefix &&
870             l > 3 &&
871             p[0] == '<' &&
872             p[1] >= '0' && p[1] <= '7' &&
873             p[2] == '>') {
874
875                 priority = p[1] - '0';
876                 p += 3;
877                 l -= 3;
878         }
879
880         if (l <= 0)
881                 return 0;
882
883         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
884                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
885
886         tag_len = s->tag ? strlen(s->tag) + 2: 0;
887         message = malloc(8 + tag_len + l);
888         if (message) {
889                 memcpy(message, "MESSAGE=", 8);
890
891                 if (s->tag) {
892                         memcpy(message+8, s->tag, tag_len-2);
893                         memcpy(message+8+tag_len-2, ": ", 2);
894                 }
895
896                 memcpy(message+8+tag_len, p, l);
897                 iovec[n].iov_base = message;
898                 iovec[n].iov_len = 8+tag_len+l;
899                 n++;
900         }
901
902         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
903
904         if (s->tee_console) {
905                 int console;
906
907                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
908                 if (console >= 0) {
909                         n = 0;
910                         if (s->tag) {
911                                 IOVEC_SET_STRING(iovec[n++], s->tag);
912                                 IOVEC_SET_STRING(iovec[n++], ": ");
913                         }
914
915                         iovec[n].iov_base = (void*) p;
916                         iovec[n].iov_len = l;
917                         n++;
918
919                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
920
921                         writev(console, iovec, n);
922                 }
923         }
924
925         free(message);
926         free(syslog_priority);
927
928         return 0;
929 }
930
931 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
932         assert(s);
933         assert(p);
934
935         while (l > 0 && strchr(WHITESPACE, *p)) {
936                 l--;
937                 p++;
938         }
939
940         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
941                 l--;
942
943         switch (s->state) {
944
945         case STDOUT_STREAM_TAG:
946
947                 if (l > 0) {
948                         s->tag = strndup(p, l);
949                         if (!s->tag) {
950                                 log_error("Out of memory");
951                                 return -EINVAL;
952                         }
953                 }
954
955                 s->state = STDOUT_STREAM_PRIORITY;
956                 return 0;
957
958         case STDOUT_STREAM_PRIORITY:
959                 if (l != 1 || *p < '0' || *p > '7') {
960                         log_warning("Failed to parse log priority line.");
961                         return -EINVAL;
962                 }
963
964                 s->priority = *p - '0';
965                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
966                 return 0;
967
968         case STDOUT_STREAM_PRIORITY_PREFIX:
969                 if (l != 1 || *p < '0' || *p > '1') {
970                         log_warning("Failed to parse priority prefix line.");
971                         return -EINVAL;
972                 }
973
974                 s->priority_prefix = *p - '0';
975                 s->state = STDOUT_STREAM_TEE_CONSOLE;
976                 return 0;
977
978         case STDOUT_STREAM_TEE_CONSOLE:
979                 if (l != 1 || *p < '0' || *p > '1') {
980                         log_warning("Failed to parse tee to console line.");
981                         return -EINVAL;
982                 }
983
984                 s->tee_console = *p - '0';
985                 s->state = STDOUT_STREAM_RUNNING;
986                 return 0;
987
988         case STDOUT_STREAM_RUNNING:
989                 return stdout_stream_log(s, p, l);
990         }
991
992         assert_not_reached("Unknown stream state");
993 }
994
995 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
996         char *p;
997         size_t remaining;
998         int r;
999
1000         assert(s);
1001
1002         p = s->buffer;
1003         remaining = s->length;
1004         for (;;) {
1005                 char *end;
1006                 size_t skip;
1007
1008                 end = memchr(p, '\n', remaining);
1009                 if (!end) {
1010                         if (remaining >= LINE_MAX) {
1011                                 end = p + LINE_MAX;
1012                                 skip = LINE_MAX;
1013                         } else
1014                                 break;
1015                 } else
1016                         skip = end - p + 1;
1017
1018                 r = stdout_stream_line(s, p, end - p);
1019                 if (r < 0)
1020                         return r;
1021
1022                 remaining -= skip;
1023                 p += skip;
1024         }
1025
1026         if (force_flush && remaining > 0) {
1027                 r = stdout_stream_line(s, p, remaining);
1028                 if (r < 0)
1029                         return r;
1030
1031                 p += remaining;
1032                 remaining = 0;
1033         }
1034
1035         if (p > s->buffer) {
1036                 memmove(s->buffer, p, remaining);
1037                 s->length = remaining;
1038         }
1039
1040         return 0;
1041 }
1042
1043 static int stdout_stream_process(StdoutStream *s) {
1044         ssize_t l;
1045         int r;
1046
1047         assert(s);
1048
1049         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
1050         if (l < 0) {
1051
1052                 if (errno == EAGAIN)
1053                         return 0;
1054
1055                 log_warning("Failed to read from stream: %m");
1056                 return -errno;
1057         }
1058
1059         if (l == 0) {
1060                 r = stdout_stream_scan(s, true);
1061                 if (r < 0)
1062                         return r;
1063
1064                 return 0;
1065         }
1066
1067         s->length += l;
1068         r = stdout_stream_scan(s, false);
1069         if (r < 0)
1070                 return r;
1071
1072         return 1;
1073
1074 }
1075
1076 static void stdout_stream_free(StdoutStream *s) {
1077         assert(s);
1078
1079         if (s->server) {
1080                 assert(s->server->n_stdout_streams > 0);
1081                 s->server->n_stdout_streams --;
1082                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1083         }
1084
1085         if (s->fd >= 0) {
1086                 if (s->server)
1087                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1088
1089                 close_nointr_nofail(s->fd);
1090         }
1091
1092         free(s->tag);
1093         free(s);
1094 }
1095
1096 static int stdout_stream_new(Server *s) {
1097         StdoutStream *stream;
1098         int fd, r;
1099         socklen_t len;
1100         struct epoll_event ev;
1101
1102         assert(s);
1103
1104         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1105         if (fd < 0) {
1106                 if (errno == EAGAIN)
1107                         return 0;
1108
1109                 log_error("Failed to accept stdout connection: %m");
1110                 return -errno;
1111         }
1112
1113         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1114                 log_warning("Too many stdout streams, refusing connection.");
1115                 close_nointr_nofail(fd);
1116                 return 0;
1117         }
1118
1119         stream = new0(StdoutStream, 1);
1120         if (!stream) {
1121                 log_error("Out of memory.");
1122                 close_nointr_nofail(fd);
1123                 return -ENOMEM;
1124         }
1125
1126         stream->fd = fd;
1127
1128         len = sizeof(stream->ucred);
1129         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1130                 log_error("Failed to determine peer credentials: %m");
1131                 r = -errno;
1132                 goto fail;
1133         }
1134
1135         if (shutdown(fd, SHUT_WR) < 0) {
1136                 log_error("Failed to shutdown writing side of socket: %m");
1137                 r = -errno;
1138                 goto fail;
1139         }
1140
1141         zero(ev);
1142         ev.data.ptr = stream;
1143         ev.events = EPOLLIN;
1144         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1145                 log_error("Failed to add stream to event loop: %m");
1146                 r = -errno;
1147                 goto fail;
1148         }
1149
1150         stream->server = s;
1151         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1152         s->n_stdout_streams ++;
1153
1154         return 0;
1155
1156 fail:
1157         stdout_stream_free(stream);
1158         return r;
1159 }
1160
1161 static int system_journal_open(Server *s) {
1162         int r;
1163         char *fn;
1164         sd_id128_t machine;
1165         char ids[33];
1166
1167         r = sd_id128_get_machine(&machine);
1168         if (r < 0)
1169                 return r;
1170
1171         sd_id128_to_string(machine, ids);
1172
1173         if (!s->system_journal) {
1174
1175                 /* First try to create the machine path, but not the prefix */
1176                 fn = strappend("/var/log/journal/", ids);
1177                 if (!fn)
1178                         return -ENOMEM;
1179                 (void) mkdir(fn, 0755);
1180                 free(fn);
1181
1182                 /* The create the system journal file */
1183                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1184                 if (!fn)
1185                         return -ENOMEM;
1186
1187                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1188                 free(fn);
1189
1190                 if (r >= 0) {
1191                         journal_default_metrics(&s->system_metrics, s->system_journal->fd);
1192
1193                         s->system_journal->metrics = s->system_metrics;
1194                         s->system_journal->compress = s->compress;
1195
1196                         fix_perms(s->system_journal, 0);
1197                 } else if (r < 0) {
1198
1199                         if (r == -ENOENT)
1200                                 r = 0;
1201                         else {
1202                                 log_error("Failed to open system journal: %s", strerror(-r));
1203                                 return r;
1204                         }
1205                 }
1206         }
1207
1208         if (!s->runtime_journal) {
1209
1210                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1211                 if (!fn)
1212                         return -ENOMEM;
1213
1214                 if (s->system_journal) {
1215
1216                         /* Try to open the runtime journal, but only
1217                          * if it already exists, so that we can flush
1218                          * it into the system journal */
1219
1220                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1221                         free(fn);
1222
1223                         if (r < 0) {
1224
1225                                 if (r == -ENOENT)
1226                                         r = 0;
1227                                 else {
1228                                         log_error("Failed to open runtime journal: %s", strerror(-r));
1229                                         return r;
1230                                 }
1231                         }
1232
1233                 } else {
1234
1235                         /* OK, we really need the runtime journal, so create
1236                          * it if necessary. */
1237
1238                         (void) mkdir_parents(fn, 0755);
1239                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1240                         free(fn);
1241
1242                         if (r < 0) {
1243                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1244                                 return r;
1245                         }
1246                 }
1247
1248                 if (s->runtime_journal) {
1249                         journal_default_metrics(&s->runtime_metrics, s->runtime_journal->fd);
1250
1251                         s->runtime_journal->metrics = s->runtime_metrics;
1252                         s->runtime_journal->compress = s->compress;
1253
1254                         fix_perms(s->runtime_journal, 0);
1255                 }
1256         }
1257
1258         return r;
1259 }
1260
1261 static int server_flush_to_var(Server *s) {
1262         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1263         Object *o = NULL;
1264         int r;
1265         sd_id128_t machine;
1266         sd_journal *j;
1267         usec_t ts;
1268
1269         assert(s);
1270
1271         if (!s->runtime_journal)
1272                 return 0;
1273
1274         ts = now(CLOCK_MONOTONIC);
1275         if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1276                 return 0;
1277
1278         s->var_available_timestamp = ts;
1279
1280         system_journal_open(s);
1281
1282         if (!s->system_journal)
1283                 return 0;
1284
1285         r = sd_id128_get_machine(&machine);
1286         if (r < 0) {
1287                 log_error("Failed to get machine id: %s", strerror(-r));
1288                 return r;
1289         }
1290
1291         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1292         if (r < 0) {
1293                 log_error("Failed to read runtime journal: %s", strerror(-r));
1294                 return r;
1295         }
1296
1297         SD_JOURNAL_FOREACH(j) {
1298                 JournalFile *f;
1299
1300                 f = j->current_file;
1301                 assert(f && f->current_offset > 0);
1302
1303                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1304                 if (r < 0) {
1305                         log_error("Can't read entry: %s", strerror(-r));
1306                         goto finish;
1307                 }
1308
1309                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1310                 if (r == -E2BIG) {
1311                         log_info("Allocation limit reached.");
1312
1313                         journal_file_post_change(s->system_journal);
1314                         server_vacuum(s);
1315
1316                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1317                 }
1318
1319                 if (r < 0) {
1320                         log_error("Can't write entry: %s", strerror(-r));
1321                         goto finish;
1322                 }
1323         }
1324
1325 finish:
1326         journal_file_post_change(s->system_journal);
1327
1328         journal_file_close(s->runtime_journal);
1329         s->runtime_journal = NULL;
1330
1331         if (r >= 0) {
1332                 sd_id128_to_string(machine, path + 17);
1333                 rm_rf(path, false, true, false);
1334         }
1335
1336         return r;
1337 }
1338
1339 static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1340         struct msghdr msghdr;
1341         struct iovec iovec;
1342         struct cmsghdr *cmsg;
1343         union {
1344                 struct cmsghdr cmsghdr;
1345                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1346                             CMSG_SPACE(sizeof(struct timeval))];
1347         } control;
1348         union sockaddr_union sa;
1349
1350         assert(s);
1351
1352         zero(msghdr);
1353
1354         zero(iovec);
1355         iovec.iov_base = (void*) buffer;
1356         iovec.iov_len = length;
1357         msghdr.msg_iov = &iovec;
1358         msghdr.msg_iovlen = 1;
1359
1360         zero(sa);
1361         sa.un.sun_family = AF_UNIX;
1362         strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1363         msghdr.msg_name = &sa;
1364         msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1365
1366         zero(control);
1367         msghdr.msg_control = &control;
1368         msghdr.msg_controllen = sizeof(control);
1369
1370         cmsg = CMSG_FIRSTHDR(&msghdr);
1371         cmsg->cmsg_level = SOL_SOCKET;
1372         cmsg->cmsg_type = SCM_CREDENTIALS;
1373         cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1374         memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1375         msghdr.msg_controllen = cmsg->cmsg_len;
1376
1377         /* Forward the syslog message we received via /dev/log to
1378          * /run/systemd/syslog. Unfortunately we currently can't set
1379          * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1380
1381         if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1382                 return;
1383
1384         if (errno == ESRCH) {
1385                 struct ucred u;
1386
1387                 /* Hmm, presumably the sender process vanished
1388                  * by now, so let's fix it as good as we
1389                  * can, and retry */
1390
1391                 u = *ucred;
1392                 u.pid = getpid();
1393                 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1394
1395                 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1396                         return;
1397         }
1398
1399         log_debug("Failed to forward syslog message: %m");
1400 }
1401
1402 static int process_event(Server *s, struct epoll_event *ev) {
1403         assert(s);
1404
1405         if (ev->data.fd == s->signal_fd) {
1406                 struct signalfd_siginfo sfsi;
1407                 ssize_t n;
1408
1409                 if (ev->events != EPOLLIN) {
1410                         log_info("Got invalid event from epoll.");
1411                         return -EIO;
1412                 }
1413
1414                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1415                 if (n != sizeof(sfsi)) {
1416
1417                         if (n >= 0)
1418                                 return -EIO;
1419
1420                         if (errno == EINTR || errno == EAGAIN)
1421                                 return 0;
1422
1423                         return -errno;
1424                 }
1425
1426                 if (sfsi.ssi_signo == SIGUSR1) {
1427                         server_flush_to_var(s);
1428                         return 0;
1429                 }
1430
1431                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1432                 return 0;
1433
1434         } else if (ev->data.fd == s->native_fd ||
1435                    ev->data.fd == s->syslog_fd) {
1436
1437                 if (ev->events != EPOLLIN) {
1438                         log_info("Got invalid event from epoll.");
1439                         return -EIO;
1440                 }
1441
1442                 for (;;) {
1443                         struct msghdr msghdr;
1444                         struct iovec iovec;
1445                         struct ucred *ucred = NULL;
1446                         struct timeval *tv = NULL;
1447                         struct cmsghdr *cmsg;
1448                         union {
1449                                 struct cmsghdr cmsghdr;
1450                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1451                                             CMSG_SPACE(sizeof(struct timeval))];
1452                         } control;
1453                         ssize_t n;
1454                         int v;
1455
1456                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1457                                 log_error("SIOCINQ failed: %m");
1458                                 return -errno;
1459                         }
1460
1461                         if (v <= 0)
1462                                 return 1;
1463
1464                         if (s->buffer_size < (size_t) v) {
1465                                 void *b;
1466                                 size_t l;
1467
1468                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1469                                 b = realloc(s->buffer, l+1);
1470
1471                                 if (!b) {
1472                                         log_error("Couldn't increase buffer.");
1473                                         return -ENOMEM;
1474                                 }
1475
1476                                 s->buffer_size = l;
1477                                 s->buffer = b;
1478                         }
1479
1480                         zero(iovec);
1481                         iovec.iov_base = s->buffer;
1482                         iovec.iov_len = s->buffer_size;
1483
1484                         zero(control);
1485                         zero(msghdr);
1486                         msghdr.msg_iov = &iovec;
1487                         msghdr.msg_iovlen = 1;
1488                         msghdr.msg_control = &control;
1489                         msghdr.msg_controllen = sizeof(control);
1490
1491                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1492                         if (n < 0) {
1493
1494                                 if (errno == EINTR || errno == EAGAIN)
1495                                         return 1;
1496
1497                                 log_error("recvmsg() failed: %m");
1498                                 return -errno;
1499                         }
1500
1501                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1502
1503                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1504                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1505                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1506                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1507                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1508                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1509                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1510                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1511                         }
1512
1513                         if (ev->data.fd == s->syslog_fd) {
1514                                 char *e;
1515
1516                                 e = memchr(s->buffer, '\n', n);
1517                                 if (e)
1518                                         *e = 0;
1519                                 else
1520                                         s->buffer[n] = 0;
1521
1522                                 forward_syslog(s, s->buffer, n, ucred, tv);
1523                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1524                         } else
1525                                 process_native_message(s, s->buffer, n, ucred, tv);
1526                 }
1527
1528                 return 1;
1529
1530         } else if (ev->data.fd == s->stdout_fd) {
1531
1532                 if (ev->events != EPOLLIN) {
1533                         log_info("Got invalid event from epoll.");
1534                         return -EIO;
1535                 }
1536
1537                 stdout_stream_new(s);
1538                 return 1;
1539
1540         } else {
1541                 StdoutStream *stream;
1542
1543                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1544                         log_info("Got invalid event from epoll.");
1545                         return -EIO;
1546                 }
1547
1548                 /* If it is none of the well-known fds, it must be an
1549                  * stdout stream fd. Note that this is a bit ugly here
1550                  * (since we rely that none of the well-known fds
1551                  * could be interpreted as pointer), but nonetheless
1552                  * safe, since the well-known fds would never get an
1553                  * fd > 4096, i.e. beyond the first memory page */
1554
1555                 stream = ev->data.ptr;
1556
1557                 if (stdout_stream_process(stream) <= 0)
1558                         stdout_stream_free(stream);
1559
1560                 return 1;
1561         }
1562
1563         log_error("Unknown event.");
1564         return 0;
1565 }
1566
1567 static int open_syslog_socket(Server *s) {
1568         union sockaddr_union sa;
1569         int one, r;
1570         struct epoll_event ev;
1571         struct timeval tv;
1572
1573         assert(s);
1574
1575         if (s->syslog_fd < 0) {
1576
1577                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1578                 if (s->syslog_fd < 0) {
1579                         log_error("socket() failed: %m");
1580                         return -errno;
1581                 }
1582
1583                 zero(sa);
1584                 sa.un.sun_family = AF_UNIX;
1585                 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
1586
1587                 unlink(sa.un.sun_path);
1588
1589                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1590                 if (r < 0) {
1591                         log_error("bind() failed: %m");
1592                         return -errno;
1593                 }
1594
1595                 chmod(sa.un.sun_path, 0666);
1596         }
1597
1598         one = 1;
1599         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1600         if (r < 0) {
1601                 log_error("SO_PASSCRED failed: %m");
1602                 return -errno;
1603         }
1604
1605         one = 1;
1606         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1607         if (r < 0) {
1608                 log_error("SO_TIMESTAMP failed: %m");
1609                 return -errno;
1610         }
1611
1612         /* Since we use the same socket for forwarding this to some
1613          * other syslog implementation, make sure we don't hang
1614          * forever */
1615         timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1616         if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1617                 log_error("SO_SNDTIMEO failed: %m");
1618                 return -errno;
1619         }
1620
1621         zero(ev);
1622         ev.events = EPOLLIN;
1623         ev.data.fd = s->syslog_fd;
1624         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1625                 log_error("Failed to add syslog server fd to epoll object: %m");
1626                 return -errno;
1627         }
1628
1629         return 0;
1630 }
1631
1632 static int open_native_socket(Server*s) {
1633         union sockaddr_union sa;
1634         int one, r;
1635         struct epoll_event ev;
1636
1637         assert(s);
1638
1639         if (s->native_fd < 0) {
1640
1641                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1642                 if (s->native_fd < 0) {
1643                         log_error("socket() failed: %m");
1644                         return -errno;
1645                 }
1646
1647                 zero(sa);
1648                 sa.un.sun_family = AF_UNIX;
1649                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1650
1651                 unlink(sa.un.sun_path);
1652
1653                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1654                 if (r < 0) {
1655                         log_error("bind() failed: %m");
1656                         return -errno;
1657                 }
1658
1659                 chmod(sa.un.sun_path, 0666);
1660         }
1661
1662         one = 1;
1663         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1664         if (r < 0) {
1665                 log_error("SO_PASSCRED failed: %m");
1666                 return -errno;
1667         }
1668
1669         one = 1;
1670         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1671         if (r < 0) {
1672                 log_error("SO_TIMESTAMP failed: %m");
1673                 return -errno;
1674         }
1675
1676         zero(ev);
1677         ev.events = EPOLLIN;
1678         ev.data.fd = s->native_fd;
1679         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1680                 log_error("Failed to add native server fd to epoll object: %m");
1681                 return -errno;
1682         }
1683
1684         return 0;
1685 }
1686
1687 static int open_stdout_socket(Server *s) {
1688         union sockaddr_union sa;
1689         int r;
1690         struct epoll_event ev;
1691
1692         assert(s);
1693
1694         if (s->stdout_fd < 0) {
1695
1696                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1697                 if (s->stdout_fd < 0) {
1698                         log_error("socket() failed: %m");
1699                         return -errno;
1700                 }
1701
1702                 zero(sa);
1703                 sa.un.sun_family = AF_UNIX;
1704                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1705
1706                 unlink(sa.un.sun_path);
1707
1708                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1709                 if (r < 0) {
1710                         log_error("bind() failed: %m");
1711                         return -errno;
1712                 }
1713
1714                 chmod(sa.un.sun_path, 0666);
1715
1716                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1717                         log_error("liste() failed: %m");
1718                         return -errno;
1719                 }
1720         }
1721
1722         zero(ev);
1723         ev.events = EPOLLIN;
1724         ev.data.fd = s->stdout_fd;
1725         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1726                 log_error("Failed to add stdout server fd to epoll object: %m");
1727                 return -errno;
1728         }
1729
1730         return 0;
1731 }
1732
1733 static int open_signalfd(Server *s) {
1734         sigset_t mask;
1735         struct epoll_event ev;
1736
1737         assert(s);
1738
1739         assert_se(sigemptyset(&mask) == 0);
1740         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1741         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1742
1743         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1744         if (s->signal_fd < 0) {
1745                 log_error("signalfd(): %m");
1746                 return -errno;
1747         }
1748
1749         zero(ev);
1750         ev.events = EPOLLIN;
1751         ev.data.fd = s->signal_fd;
1752
1753         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1754                 log_error("epoll_ctl(): %m");
1755                 return -errno;
1756         }
1757
1758         return 0;
1759 }
1760
1761 static int server_init(Server *s) {
1762         int n, r, fd;
1763
1764         assert(s);
1765
1766         zero(*s);
1767         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1768         s->compress = true;
1769
1770         memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics));
1771         memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics));
1772
1773         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1774         if (!s->user_journals) {
1775                 log_error("Out of memory.");
1776                 return -ENOMEM;
1777         }
1778
1779         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1780         if (s->epoll_fd < 0) {
1781                 log_error("Failed to create epoll object: %m");
1782                 return -errno;
1783         }
1784
1785         n = sd_listen_fds(true);
1786         if (n < 0) {
1787                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1788                 return n;
1789         }
1790
1791         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1792
1793                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1794
1795                         if (s->native_fd >= 0) {
1796                                 log_error("Too many native sockets passed.");
1797                                 return -EINVAL;
1798                         }
1799
1800                         s->native_fd = fd;
1801
1802                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1803
1804                         if (s->stdout_fd >= 0) {
1805                                 log_error("Too many stdout sockets passed.");
1806                                 return -EINVAL;
1807                         }
1808
1809                         s->stdout_fd = fd;
1810
1811                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1812
1813                         if (s->syslog_fd >= 0) {
1814                                 log_error("Too many /dev/log sockets passed.");
1815                                 return -EINVAL;
1816                         }
1817
1818                         s->syslog_fd = fd;
1819
1820                 } else {
1821                         log_error("Unknown socket passed.");
1822                         return -EINVAL;
1823                 }
1824         }
1825
1826         r = open_syslog_socket(s);
1827         if (r < 0)
1828                 return r;
1829
1830         r = open_native_socket(s);
1831         if (r < 0)
1832                 return r;
1833
1834         r = open_stdout_socket(s);
1835         if (r < 0)
1836                 return r;
1837
1838         r = system_journal_open(s);
1839         if (r < 0)
1840                 return r;
1841
1842         r = open_signalfd(s);
1843         if (r < 0)
1844                 return r;
1845
1846         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1847         if (!s->rate_limit)
1848                 return -ENOMEM;
1849
1850         return 0;
1851 }
1852
1853 static void server_done(Server *s) {
1854         JournalFile *f;
1855         assert(s);
1856
1857         while (s->stdout_streams)
1858                 stdout_stream_free(s->stdout_streams);
1859
1860         if (s->system_journal)
1861                 journal_file_close(s->system_journal);
1862
1863         if (s->runtime_journal)
1864                 journal_file_close(s->runtime_journal);
1865
1866         while ((f = hashmap_steal_first(s->user_journals)))
1867                 journal_file_close(f);
1868
1869         hashmap_free(s->user_journals);
1870
1871         if (s->epoll_fd >= 0)
1872                 close_nointr_nofail(s->epoll_fd);
1873
1874         if (s->signal_fd >= 0)
1875                 close_nointr_nofail(s->signal_fd);
1876
1877         if (s->syslog_fd >= 0)
1878                 close_nointr_nofail(s->syslog_fd);
1879
1880         if (s->native_fd >= 0)
1881                 close_nointr_nofail(s->native_fd);
1882
1883         if (s->stdout_fd >= 0)
1884                 close_nointr_nofail(s->stdout_fd);
1885
1886         if (s->rate_limit)
1887                 journal_rate_limit_free(s->rate_limit);
1888
1889         free(s->buffer);
1890 }
1891
1892 int main(int argc, char *argv[]) {
1893         Server server;
1894         int r;
1895
1896         /* if (getppid() != 1) { */
1897         /*         log_error("This program should be invoked by init only."); */
1898         /*         return EXIT_FAILURE; */
1899         /* } */
1900
1901         if (argc > 1) {
1902                 log_error("This program does not take arguments.");
1903                 return EXIT_FAILURE;
1904         }
1905
1906         log_set_target(LOG_TARGET_CONSOLE);
1907         log_parse_environment();
1908         log_open();
1909
1910         umask(0022);
1911
1912         r = server_init(&server);
1913         if (r < 0)
1914                 goto finish;
1915
1916         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1917
1918         sd_notify(false,
1919                   "READY=1\n"
1920                   "STATUS=Processing requests...");
1921
1922         server_vacuum(&server);
1923         server_flush_to_var(&server);
1924
1925         for (;;) {
1926                 struct epoll_event event;
1927
1928                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1929                 if (r < 0) {
1930
1931                         if (errno == EINTR)
1932                                 continue;
1933
1934                         log_error("epoll_wait() failed: %m");
1935                         r = -errno;
1936                         goto finish;
1937                 } else if (r == 0)
1938                         break;
1939
1940                 r = process_event(&server, &event);
1941                 if (r < 0)
1942                         goto finish;
1943                 else if (r == 0)
1944                         break;
1945         }
1946
1947         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1948
1949 finish:
1950         sd_notify(false,
1951                   "STATUS=Shutting down...");
1952
1953         server_done(&server);
1954
1955         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1956 }