chiark / gitweb /
Merge branch 'journal'
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "journal-internal.h"
45
46 #define USER_JOURNALS_MAX 1024
47 #define STDOUT_STREAMS_MAX 4096
48
49 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
50 #define DEFAULT_RATE_LIMIT_BURST 200
51
52 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
53
54 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
55
56 #define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
57
58 typedef struct StdoutStream StdoutStream;
59
60 typedef struct Server {
61         int epoll_fd;
62         int signal_fd;
63         int syslog_fd;
64         int native_fd;
65         int stdout_fd;
66
67         JournalFile *runtime_journal;
68         JournalFile *system_journal;
69         Hashmap *user_journals;
70
71         uint64_t seqnum;
72
73         char *buffer;
74         size_t buffer_size;
75
76         JournalRateLimit *rate_limit;
77
78         JournalMetrics metrics;
79         bool compress;
80
81         uint64_t cached_available_space;
82         usec_t cached_available_space_timestamp;
83
84         uint64_t var_available_timestamp;
85
86         LIST_HEAD(StdoutStream, stdout_streams);
87         unsigned n_stdout_streams;
88 } Server;
89
90 typedef enum StdoutStreamState {
91         STDOUT_STREAM_TAG,
92         STDOUT_STREAM_PRIORITY,
93         STDOUT_STREAM_PRIORITY_PREFIX,
94         STDOUT_STREAM_TEE_CONSOLE,
95         STDOUT_STREAM_RUNNING
96 } StdoutStreamState;
97
98 struct StdoutStream {
99         Server *server;
100         StdoutStreamState state;
101
102         int fd;
103
104         struct ucred ucred;
105
106         char *tag;
107         int priority;
108         bool priority_prefix:1;
109         bool tee_console:1;
110
111         char buffer[LINE_MAX+1];
112         size_t length;
113
114         LIST_FIELDS(StdoutStream, stdout_stream);
115 };
116
117 static int server_flush_to_var(Server *s);
118
119 static uint64_t available_space(Server *s) {
120         char ids[33];
121         sd_id128_t machine;
122         char *p;
123         const char *f;
124         struct statvfs ss;
125         uint64_t sum = 0, avail = 0, ss_avail = 0;
126         int r;
127         DIR *d;
128         usec_t ts = now(CLOCK_MONOTONIC);
129
130         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
131                 return s->cached_available_space;
132
133         r = sd_id128_get_machine(&machine);
134         if (r < 0)
135                 return 0;
136
137         if (s->system_journal)
138                 f = "/var/log/journal/";
139         else
140                 f = "/run/log/journal/";
141
142         p = strappend(f, sd_id128_to_string(machine, ids));
143         if (!p)
144                 return 0;
145
146         d = opendir(p);
147         free(p);
148
149         if (!d)
150                 return 0;
151
152         if (fstatvfs(dirfd(d), &ss) < 0)
153                 goto finish;
154
155         for (;;) {
156                 struct stat st;
157                 struct dirent buf, *de;
158                 int k;
159
160                 k = readdir_r(d, &buf, &de);
161                 if (k != 0) {
162                         r = -k;
163                         goto finish;
164                 }
165
166                 if (!de)
167                         break;
168
169                 if (!dirent_is_file_with_suffix(de, ".journal"))
170                         continue;
171
172                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
173                         continue;
174
175                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
176         }
177
178         avail = sum >= s->metrics.max_use ? 0 : s->metrics.max_use - sum;
179
180         ss_avail = ss.f_bsize * ss.f_bavail;
181
182         ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
183
184         if (ss_avail < avail)
185                 avail = ss_avail;
186
187         s->cached_available_space = avail;
188         s->cached_available_space_timestamp = ts;
189
190 finish:
191         closedir(d);
192
193         return avail;
194 }
195
196 static void fix_perms(JournalFile *f, uid_t uid) {
197         acl_t acl;
198         acl_entry_t entry;
199         acl_permset_t permset;
200         int r;
201
202         assert(f);
203
204         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
205         if (r < 0)
206                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
207
208         if (uid <= 0)
209                 return;
210
211         acl = acl_get_fd(f->fd);
212         if (!acl) {
213                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
214                 return;
215         }
216
217         r = acl_find_uid(acl, uid, &entry);
218         if (r <= 0) {
219
220                 if (acl_create_entry(&acl, &entry) < 0 ||
221                     acl_set_tag_type(entry, ACL_USER) < 0 ||
222                     acl_set_qualifier(entry, &uid) < 0) {
223                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
224                         goto finish;
225                 }
226         }
227
228         if (acl_get_permset(entry, &permset) < 0 ||
229             acl_add_perm(permset, ACL_READ) < 0 ||
230             acl_calc_mask(&acl) < 0) {
231                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
232                 goto finish;
233         }
234
235         if (acl_set_fd(f->fd, acl) < 0)
236                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
237
238 finish:
239         acl_free(acl);
240 }
241
242 static JournalFile* find_journal(Server *s, uid_t uid) {
243         char *p;
244         int r;
245         JournalFile *f;
246         char ids[33];
247         sd_id128_t machine;
248
249         assert(s);
250
251         /* We split up user logs only on /var, not on /run. If the
252          * runtime file is open, we write to it exclusively, in order
253          * to guarantee proper order as soon as we flush /run to
254          * /var and close the runtime file. */
255
256         if (s->runtime_journal)
257                 return s->runtime_journal;
258
259         if (uid <= 0)
260                 return s->system_journal;
261
262         r = sd_id128_get_machine(&machine);
263         if (r < 0)
264                 return s->system_journal;
265
266         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
267         if (f)
268                 return f;
269
270         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
271                 return s->system_journal;
272
273         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
274                 /* Too many open? Then let's close one */
275                 f = hashmap_steal_first(s->user_journals);
276                 assert(f);
277                 journal_file_close(f);
278         }
279
280         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
281         free(p);
282
283         if (r < 0)
284                 return s->system_journal;
285
286         fix_perms(f, uid);
287         f->metrics = s->metrics;
288         f->compress = s->compress;
289
290         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
291         if (r < 0) {
292                 journal_file_close(f);
293                 return s->system_journal;
294         }
295
296         return f;
297 }
298
299 static void server_vacuum(Server *s) {
300         Iterator i;
301         void *k;
302         char *p;
303         char ids[33];
304         sd_id128_t machine;
305         int r;
306         JournalFile *f;
307
308         log_info("Rotating...");
309
310         if (s->runtime_journal) {
311                 r = journal_file_rotate(&s->runtime_journal);
312                 if (r < 0)
313                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
314         }
315
316         if (s->system_journal) {
317                 r = journal_file_rotate(&s->system_journal);
318                 if (r < 0)
319                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
320         }
321
322         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
323                 r = journal_file_rotate(&f);
324                 if (r < 0)
325                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
326                 else
327                         hashmap_replace(s->user_journals, k, f);
328         }
329
330         log_info("Vacuuming...");
331
332         r = sd_id128_get_machine(&machine);
333         if (r < 0) {
334                 log_error("Failed to get machine ID: %s", strerror(-r));
335                 return;
336         }
337
338         if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
339                 log_error("Out of memory.");
340                 return;
341         }
342
343         r = journal_directory_vacuum(p, s->metrics.max_use, s->metrics.keep_free);
344         if (r < 0 && r != -ENOENT)
345                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
346         free(p);
347
348         if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
349                 log_error("Out of memory.");
350                 return;
351         }
352
353         r = journal_directory_vacuum(p, s->metrics.max_use, s->metrics.keep_free);
354         if (r < 0 && r != -ENOENT)
355                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
356         free(p);
357
358         s->cached_available_space_timestamp = 0;
359 }
360
361 static char *shortened_cgroup_path(pid_t pid) {
362         int r;
363         char *process_path, *init_path, *path;
364
365         assert(pid > 0);
366
367         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
368         if (r < 0)
369                 return NULL;
370
371         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
372         if (r < 0) {
373                 free(process_path);
374                 return NULL;
375         }
376
377         if (streq(init_path, "/"))
378                 init_path[0] = 0;
379
380         if (startswith(process_path, init_path)) {
381                 char *p;
382
383                 p = strdup(process_path + strlen(init_path));
384                 if (!p) {
385                         free(process_path);
386                         free(init_path);
387                         return NULL;
388                 }
389                 path = p;
390         } else {
391                 path = process_path;
392                 process_path = NULL;
393         }
394
395         free(process_path);
396         free(init_path);
397
398         return path;
399 }
400
401 static void dispatch_message_real(Server *s,
402                              struct iovec *iovec, unsigned n, unsigned m,
403                              struct ucred *ucred,
404                              struct timeval *tv) {
405
406         char *pid = NULL, *uid = NULL, *gid = NULL,
407                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
408                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
409                 *audit_session = NULL, *audit_loginuid = NULL,
410                 *exe = NULL, *cgroup = NULL;
411
412         char idbuf[33];
413         sd_id128_t id;
414         int r;
415         char *t;
416         uid_t loginuid = 0, realuid = 0;
417         JournalFile *f;
418         bool vacuumed = false;
419
420         assert(s);
421         assert(iovec);
422         assert(n > 0);
423         assert(n + 13 <= m);
424
425         if (ucred) {
426                 uint32_t session;
427                 char *path;
428
429                 realuid = ucred->uid;
430
431                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
432                         IOVEC_SET_STRING(iovec[n++], pid);
433
434                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
435                         IOVEC_SET_STRING(iovec[n++], uid);
436
437                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
438                         IOVEC_SET_STRING(iovec[n++], gid);
439
440                 r = get_process_comm(ucred->pid, &t);
441                 if (r >= 0) {
442                         comm = strappend("_COMM=", t);
443                         if (comm)
444                                 IOVEC_SET_STRING(iovec[n++], comm);
445                         free(t);
446                 }
447
448                 r = get_process_exe(ucred->pid, &t);
449                 if (r >= 0) {
450                         exe = strappend("_EXE=", t);
451                         if (comm)
452                                 IOVEC_SET_STRING(iovec[n++], exe);
453                         free(t);
454                 }
455
456                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
457                 if (r >= 0) {
458                         cmdline = strappend("_CMDLINE=", t);
459                         if (cmdline)
460                                 IOVEC_SET_STRING(iovec[n++], cmdline);
461                         free(t);
462                 }
463
464                 r = audit_session_from_pid(ucred->pid, &session);
465                 if (r >= 0)
466                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
467                                 IOVEC_SET_STRING(iovec[n++], audit_session);
468
469                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
470                 if (r >= 0)
471                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
472                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
473
474                 path = shortened_cgroup_path(ucred->pid);
475                 if (path) {
476                         cgroup = strappend("_SYSTEMD_CGROUP=", path);
477                         if (cgroup)
478                                 IOVEC_SET_STRING(iovec[n++], cgroup);
479
480                         free(path);
481                 }
482         }
483
484         if (tv) {
485                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
486                              (unsigned long long) timeval_load(tv)) >= 0)
487                         IOVEC_SET_STRING(iovec[n++], source_time);
488         }
489
490         /* Note that strictly speaking storing the boot id here is
491          * redundant since the entry includes this in-line
492          * anyway. However, we need this indexed, too. */
493         r = sd_id128_get_boot(&id);
494         if (r >= 0)
495                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
496                         IOVEC_SET_STRING(iovec[n++], boot_id);
497
498         r = sd_id128_get_machine(&id);
499         if (r >= 0)
500                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
501                         IOVEC_SET_STRING(iovec[n++], machine_id);
502
503         t = gethostname_malloc();
504         if (t) {
505                 hostname = strappend("_HOSTNAME=", t);
506                 if (hostname)
507                         IOVEC_SET_STRING(iovec[n++], hostname);
508                 free(t);
509         }
510
511         assert(n <= m);
512
513         server_flush_to_var(s);
514
515 retry:
516         f = find_journal(s, realuid == 0 ? 0 : loginuid);
517         if (!f)
518                 log_warning("Dropping message, as we can't find a place to store the data.");
519         else {
520                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
521
522                 if (r == -E2BIG && !vacuumed) {
523                         log_info("Allocation limit reached.");
524
525                         server_vacuum(s);
526                         vacuumed = true;
527
528                         log_info("Retrying write.");
529                         goto retry;
530                 }
531
532                 if (r < 0)
533                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
534         }
535
536         free(pid);
537         free(uid);
538         free(gid);
539         free(comm);
540         free(exe);
541         free(cmdline);
542         free(source_time);
543         free(boot_id);
544         free(machine_id);
545         free(hostname);
546         free(audit_session);
547         free(audit_loginuid);
548         free(cgroup);
549 }
550
551 static void dispatch_message(Server *s,
552                              struct iovec *iovec, unsigned n, unsigned m,
553                              struct ucred *ucred,
554                              struct timeval *tv,
555                              int priority) {
556         int rl;
557         char *path = NULL, *c;
558
559         assert(s);
560         assert(iovec || n == 0);
561
562         if (n == 0)
563                 return;
564
565         if (!ucred)
566                 goto finish;
567
568         path = shortened_cgroup_path(ucred->pid);
569         if (!path)
570                 goto finish;
571
572         /* example: /user/lennart/3/foobar
573          *          /system/dbus.service/foobar
574          *
575          * So let's cut of everything past the third /, since that is
576          * wher user directories start */
577
578         c = strchr(path, '/');
579         if (c) {
580                 c = strchr(c+1, '/');
581                 if (c) {
582                         c = strchr(c+1, '/');
583                         if (c)
584                                 *c = 0;
585                 }
586         }
587
588         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
589
590         if (rl == 0) {
591                 free(path);
592                 return;
593         }
594
595         if (rl > 1) {
596                 int j = 0;
597                 char suppress_message[LINE_MAX];
598                 struct iovec suppress_iovec[15];
599
600                 /* Write a suppression message if we suppressed something */
601
602                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
603                 char_array_0(suppress_message);
604
605                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
606                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
607
608                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
609         }
610
611         free(path);
612
613 finish:
614         dispatch_message_real(s, iovec, n, m, ucred, tv);
615 }
616
617 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
618         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
619         struct iovec iovec[16];
620         unsigned n = 0;
621         int priority = LOG_USER | LOG_INFO;
622
623         assert(s);
624         assert(buf);
625
626         parse_syslog_priority((char**) &buf, &priority);
627         skip_syslog_date((char**) &buf);
628
629         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
630                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
631
632         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
633                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
634
635         message = strappend("MESSAGE=", buf);
636         if (message)
637                 IOVEC_SET_STRING(iovec[n++], message);
638
639         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
640
641         free(message);
642         free(syslog_facility);
643         free(syslog_priority);
644 }
645
646 static bool valid_user_field(const char *p, size_t l) {
647         const char *a;
648
649         /* We kinda enforce POSIX syntax recommendations for
650            environment variables here, but make a couple of additional
651            requirements.
652
653            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
654
655         /* No empty field names */
656         if (l <= 0)
657                 return false;
658
659         /* Don't allow names longer than 64 chars */
660         if (l > 64)
661                 return false;
662
663         /* Variables starting with an underscore are protected */
664         if (p[0] == '_')
665                 return false;
666
667         /* Don't allow digits as first character */
668         if (p[0] >= '0' && p[0] <= '9')
669                 return false;
670
671         /* Only allow A-Z0-9 and '_' */
672         for (a = p; a < p + l; a++)
673                 if (!((*a >= 'A' && *a <= 'Z') ||
674                       (*a >= '0' && *a <= '9') ||
675                       *a == '_'))
676                         return false;
677
678         return true;
679 }
680
681 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
682         struct iovec *iovec = NULL;
683         unsigned n = 0, m = 0, j;
684         const char *p;
685         size_t remaining;
686         int priority = LOG_INFO;
687
688         assert(s);
689         assert(buffer || n == 0);
690
691         p = buffer;
692         remaining = buffer_size;
693
694         while (remaining > 0) {
695                 const char *e, *q;
696
697                 e = memchr(p, '\n', remaining);
698
699                 if (!e) {
700                         /* Trailing noise, let's ignore it, and flush what we collected */
701                         log_debug("Received message with trailing noise, ignoring.");
702                         break;
703                 }
704
705                 if (e == p) {
706                         /* Entry separator */
707                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
708                         n = 0;
709                         priority = LOG_INFO;
710
711                         p++;
712                         remaining--;
713                         continue;
714                 }
715
716                 if (*p == '.' || *p == '#') {
717                         /* Ignore control commands for now, and
718                          * comments too. */
719                         remaining -= (e - p) + 1;
720                         p = e + 1;
721                         continue;
722                 }
723
724                 /* A property follows */
725
726                 if (n+13 >= m) {
727                         struct iovec *c;
728                         unsigned u;
729
730                         u = MAX((n+13U) * 2U, 4U);
731                         c = realloc(iovec, u * sizeof(struct iovec));
732                         if (!c) {
733                                 log_error("Out of memory");
734                                 break;
735                         }
736
737                         iovec = c;
738                         m = u;
739                 }
740
741                 q = memchr(p, '=', e - p);
742                 if (q) {
743                         if (valid_user_field(p, q - p)) {
744                                 /* If the field name starts with an
745                                  * underscore, skip the variable,
746                                  * since that indidates a trusted
747                                  * field */
748                                 iovec[n].iov_base = (char*) p;
749                                 iovec[n].iov_len = e - p;
750                                 n++;
751
752                                 /* We need to determine the priority
753                                  * of this entry for the rate limiting
754                                  * logic */
755                                 if (e - p == 10 &&
756                                     memcmp(p, "PRIORITY=", 10) == 0 &&
757                                     p[10] >= '0' &&
758                                     p[10] <= '9')
759                                         priority = p[10] - '0';
760                         }
761
762                         remaining -= (e - p) + 1;
763                         p = e + 1;
764                         continue;
765                 } else {
766                         uint64_t l;
767                         char *k;
768
769                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
770                                 log_debug("Failed to parse message, ignoring.");
771                                 break;
772                         }
773
774                         memcpy(&l, e + 1, sizeof(uint64_t));
775                         l = le64toh(l);
776
777                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
778                             e[1+sizeof(uint64_t)+l] != '\n') {
779                                 log_debug("Failed to parse message, ignoring.");
780                                 break;
781                         }
782
783                         k = malloc((e - p) + 1 + l);
784                         if (!k) {
785                                 log_error("Out of memory");
786                                 break;
787                         }
788
789                         memcpy(k, p, e - p);
790                         k[e - p] = '=';
791                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
792
793                         if (valid_user_field(p, e - p)) {
794                                 iovec[n].iov_base = k;
795                                 iovec[n].iov_len = (e - p) + 1 + l;
796                                 n++;
797                         } else
798                                 free(k);
799
800                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
801                         p = e + 1 + sizeof(uint64_t) + l + 1;
802                 }
803         }
804
805         dispatch_message(s, iovec, n, m, ucred, tv, priority);
806
807         for (j = 0; j < n; j++)
808                 if (iovec[j].iov_base < buffer ||
809                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
810                         free(iovec[j].iov_base);
811 }
812
813 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
814         struct iovec iovec[15];
815         char *message = NULL, *syslog_priority = NULL;
816         unsigned n = 0;
817         size_t tag_len;
818         int priority;
819
820         assert(s);
821         assert(p);
822
823         priority = s->priority;
824
825         if (s->priority_prefix &&
826             l > 3 &&
827             p[0] == '<' &&
828             p[1] >= '0' && p[1] <= '7' &&
829             p[2] == '>') {
830
831                 priority = p[1] - '0';
832                 p += 3;
833                 l -= 3;
834         }
835
836         if (l <= 0)
837                 return 0;
838
839         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
840                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
841
842         tag_len = s->tag ? strlen(s->tag) + 2: 0;
843         message = malloc(8 + tag_len + l);
844         if (message) {
845                 memcpy(message, "MESSAGE=", 8);
846
847                 if (s->tag) {
848                         memcpy(message+8, s->tag, tag_len-2);
849                         memcpy(message+8+tag_len-2, ": ", 2);
850                 }
851
852                 memcpy(message+8+tag_len, p, l);
853                 iovec[n].iov_base = message;
854                 iovec[n].iov_len = 8+tag_len+l;
855                 n++;
856         }
857
858         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
859
860         if (s->tee_console) {
861                 int console;
862
863                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
864                 if (console >= 0) {
865                         n = 0;
866                         if (s->tag) {
867                                 IOVEC_SET_STRING(iovec[n++], s->tag);
868                                 IOVEC_SET_STRING(iovec[n++], ": ");
869                         }
870
871                         iovec[n].iov_base = (void*) p;
872                         iovec[n].iov_len = l;
873                         n++;
874
875                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
876
877                         writev(console, iovec, n);
878                 }
879         }
880
881         free(message);
882         free(syslog_priority);
883
884         return 0;
885 }
886
887 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
888         assert(s);
889         assert(p);
890
891         while (l > 0 && strchr(WHITESPACE, *p)) {
892                 l--;
893                 p++;
894         }
895
896         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
897                 l--;
898
899         switch (s->state) {
900
901         case STDOUT_STREAM_TAG:
902
903                 if (l > 0) {
904                         s->tag = strndup(p, l);
905                         if (!s->tag) {
906                                 log_error("Out of memory");
907                                 return -EINVAL;
908                         }
909                 }
910
911                 s->state = STDOUT_STREAM_PRIORITY;
912                 return 0;
913
914         case STDOUT_STREAM_PRIORITY:
915                 if (l != 1 || *p < '0' || *p > '7') {
916                         log_warning("Failed to parse log priority line.");
917                         return -EINVAL;
918                 }
919
920                 s->priority = *p - '0';
921                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
922                 return 0;
923
924         case STDOUT_STREAM_PRIORITY_PREFIX:
925                 if (l != 1 || *p < '0' || *p > '1') {
926                         log_warning("Failed to parse priority prefix line.");
927                         return -EINVAL;
928                 }
929
930                 s->priority_prefix = *p - '0';
931                 s->state = STDOUT_STREAM_TEE_CONSOLE;
932                 return 0;
933
934         case STDOUT_STREAM_TEE_CONSOLE:
935                 if (l != 1 || *p < '0' || *p > '1') {
936                         log_warning("Failed to parse tee to console line.");
937                         return -EINVAL;
938                 }
939
940                 s->tee_console = *p - '0';
941                 s->state = STDOUT_STREAM_RUNNING;
942                 return 0;
943
944         case STDOUT_STREAM_RUNNING:
945                 return stdout_stream_log(s, p, l);
946         }
947
948         assert_not_reached("Unknown stream state");
949 }
950
951 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
952         char *p;
953         size_t remaining;
954         int r;
955
956         assert(s);
957
958         p = s->buffer;
959         remaining = s->length;
960         for (;;) {
961                 char *end;
962                 size_t skip;
963
964                 end = memchr(p, '\n', remaining);
965                 if (!end) {
966                         if (remaining >= LINE_MAX) {
967                                 end = p + LINE_MAX;
968                                 skip = LINE_MAX;
969                         } else
970                                 break;
971                 } else
972                         skip = end - p + 1;
973
974                 r = stdout_stream_line(s, p, end - p);
975                 if (r < 0)
976                         return r;
977
978                 remaining -= skip;
979                 p += skip;
980         }
981
982         if (force_flush && remaining > 0) {
983                 r = stdout_stream_line(s, p, remaining);
984                 if (r < 0)
985                         return r;
986
987                 p += remaining;
988                 remaining = 0;
989         }
990
991         if (p > s->buffer) {
992                 memmove(s->buffer, p, remaining);
993                 s->length = remaining;
994         }
995
996         return 0;
997 }
998
999 static int stdout_stream_process(StdoutStream *s) {
1000         ssize_t l;
1001         int r;
1002
1003         assert(s);
1004
1005         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
1006         if (l < 0) {
1007
1008                 if (errno == EAGAIN)
1009                         return 0;
1010
1011                 log_warning("Failed to read from stream: %m");
1012                 return -errno;
1013         }
1014
1015         if (l == 0) {
1016                 r = stdout_stream_scan(s, true);
1017                 if (r < 0)
1018                         return r;
1019
1020                 return 0;
1021         }
1022
1023         s->length += l;
1024         r = stdout_stream_scan(s, false);
1025         if (r < 0)
1026                 return r;
1027
1028         return 1;
1029
1030 }
1031
1032 static void stdout_stream_free(StdoutStream *s) {
1033         assert(s);
1034
1035         if (s->server) {
1036                 assert(s->server->n_stdout_streams > 0);
1037                 s->server->n_stdout_streams --;
1038                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1039         }
1040
1041         if (s->fd >= 0) {
1042                 if (s->server)
1043                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1044
1045                 close_nointr_nofail(s->fd);
1046         }
1047
1048         free(s->tag);
1049         free(s);
1050 }
1051
1052 static int stdout_stream_new(Server *s) {
1053         StdoutStream *stream;
1054         int fd, r;
1055         socklen_t len;
1056         struct epoll_event ev;
1057
1058         assert(s);
1059
1060         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1061         if (fd < 0) {
1062                 if (errno == EAGAIN)
1063                         return 0;
1064
1065                 log_error("Failed to accept stdout connection: %m");
1066                 return -errno;
1067         }
1068
1069         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1070                 log_warning("Too many stdout streams, refusing connection.");
1071                 close_nointr_nofail(fd);
1072                 return 0;
1073         }
1074
1075         stream = new0(StdoutStream, 1);
1076         if (!stream) {
1077                 log_error("Out of memory.");
1078                 close_nointr_nofail(fd);
1079                 return -ENOMEM;
1080         }
1081
1082         stream->fd = fd;
1083
1084         len = sizeof(stream->ucred);
1085         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1086                 log_error("Failed to determine peer credentials: %m");
1087                 r = -errno;
1088                 goto fail;
1089         }
1090
1091         if (shutdown(fd, SHUT_WR) < 0) {
1092                 log_error("Failed to shutdown writing side of socket: %m");
1093                 r = -errno;
1094                 goto fail;
1095         }
1096
1097         zero(ev);
1098         ev.data.ptr = stream;
1099         ev.events = EPOLLIN;
1100         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1101                 log_error("Failed to add stream to event loop: %m");
1102                 r = -errno;
1103                 goto fail;
1104         }
1105
1106         stream->server = s;
1107         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1108         s->n_stdout_streams ++;
1109
1110         return 0;
1111
1112 fail:
1113         stdout_stream_free(stream);
1114         return r;
1115 }
1116
1117 static int system_journal_open(Server *s) {
1118         int r;
1119         char *fn;
1120         sd_id128_t machine;
1121         char ids[33];
1122
1123         r = sd_id128_get_machine(&machine);
1124         if (r < 0)
1125                 return r;
1126
1127         sd_id128_to_string(machine, ids);
1128
1129         if (!s->system_journal) {
1130
1131                 /* First try to create the machine path, but not the prefix */
1132                 fn = strappend("/var/log/journal/", ids);
1133                 if (!fn)
1134                         return -ENOMEM;
1135                 (void) mkdir(fn, 0755);
1136                 free(fn);
1137
1138                 /* The create the system journal file */
1139                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1140                 if (!fn)
1141                         return -ENOMEM;
1142
1143                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1144                 free(fn);
1145
1146                 if (r >= 0) {
1147                         s->system_journal->metrics = s->metrics;
1148                         s->system_journal->compress = s->compress;
1149
1150                         fix_perms(s->system_journal, 0);
1151                 } else if (r < 0) {
1152
1153                         if (r == -ENOENT)
1154                                 r = 0;
1155                         else {
1156                                 log_error("Failed to open system journal: %s", strerror(-r));
1157                                 return r;
1158                         }
1159                 }
1160         }
1161
1162         if (!s->runtime_journal) {
1163
1164                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1165                 if (!fn)
1166                         return -ENOMEM;
1167
1168                 if (s->system_journal) {
1169
1170                         /* Try to open the runtime journal, but only
1171                          * if it already exists, so that we can flush
1172                          * it into the system journal */
1173
1174                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1175                         free(fn);
1176
1177                         if (r < 0) {
1178
1179                                 if (r == -ENOENT)
1180                                         r = 0;
1181                                 else {
1182                                         log_error("Failed to open runtime journal: %s", strerror(-r));
1183                                         return r;
1184                                 }
1185                         }
1186
1187                 } else {
1188
1189                         /* OK, we really need the runtime journal, so create
1190                          * it if necessary. */
1191
1192                         (void) mkdir_parents(fn, 0755);
1193                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1194                         free(fn);
1195
1196                         if (r < 0) {
1197                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1198                                 return r;
1199                         }
1200                 }
1201
1202                 if (s->runtime_journal) {
1203                         s->runtime_journal->metrics = s->metrics;
1204                         s->runtime_journal->compress = s->compress;
1205
1206                         fix_perms(s->runtime_journal, 0);
1207                 }
1208         }
1209
1210         return r;
1211 }
1212
1213 static int server_flush_to_var(Server *s) {
1214         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1215         Object *o = NULL;
1216         int r;
1217         sd_id128_t machine;
1218         sd_journal *j;
1219         usec_t ts;
1220
1221         assert(s);
1222
1223         if (!s->runtime_journal)
1224                 return 0;
1225
1226         ts = now(CLOCK_MONOTONIC);
1227         if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1228                 return 0;
1229
1230         s->var_available_timestamp = ts;
1231
1232         system_journal_open(s);
1233
1234         if (!s->system_journal)
1235                 return 0;
1236
1237         r = sd_id128_get_machine(&machine);
1238         if (r < 0) {
1239                 log_error("Failed to get machine id: %s", strerror(-r));
1240                 return r;
1241         }
1242
1243         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1244         if (r < 0) {
1245                 log_error("Failed to read runtime journal: %s", strerror(-r));
1246                 return r;
1247         }
1248
1249         SD_JOURNAL_FOREACH(j) {
1250                 JournalFile *f;
1251
1252                 f = j->current_file;
1253                 assert(f && f->current_offset > 0);
1254
1255                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1256                 if (r < 0) {
1257                         log_error("Can't read entry: %s", strerror(-r));
1258                         goto finish;
1259                 }
1260
1261                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1262                 if (r == -E2BIG) {
1263                         log_info("Allocation limit reached.");
1264
1265                         journal_file_post_change(s->system_journal);
1266                         server_vacuum(s);
1267
1268                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1269                 }
1270
1271                 if (r < 0) {
1272                         log_error("Can't write entry: %s", strerror(-r));
1273                         goto finish;
1274                 }
1275         }
1276
1277 finish:
1278         journal_file_post_change(s->system_journal);
1279
1280         journal_file_close(s->runtime_journal);
1281         s->runtime_journal = NULL;
1282
1283         if (r >= 0) {
1284                 sd_id128_to_string(machine, path + 17);
1285                 rm_rf(path, false, true, false);
1286         }
1287
1288         return r;
1289 }
1290
1291 static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1292         struct msghdr msghdr;
1293         struct iovec iovec;
1294         struct cmsghdr *cmsg;
1295         union {
1296                 struct cmsghdr cmsghdr;
1297                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1298                             CMSG_SPACE(sizeof(struct timeval))];
1299         } control;
1300         union sockaddr_union sa;
1301
1302         assert(s);
1303
1304         zero(msghdr);
1305
1306         zero(iovec);
1307         iovec.iov_base = (void*) buffer;
1308         iovec.iov_len = length;
1309         msghdr.msg_iov = &iovec;
1310         msghdr.msg_iovlen = 1;
1311
1312         zero(sa);
1313         sa.un.sun_family = AF_UNIX;
1314         strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1315         msghdr.msg_name = &sa;
1316         msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1317
1318         zero(control);
1319         msghdr.msg_control = &control;
1320         msghdr.msg_controllen = sizeof(control);
1321
1322         cmsg = CMSG_FIRSTHDR(&msghdr);
1323         cmsg->cmsg_level = SOL_SOCKET;
1324         cmsg->cmsg_type = SCM_CREDENTIALS;
1325         cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1326         memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1327         msghdr.msg_controllen = cmsg->cmsg_len;
1328
1329         /* Forward the syslog message we received via /dev/log to
1330          * /run/systemd/syslog. Unfortunately we currently can't set
1331          * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1332
1333         if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1334                 return;
1335
1336         if (errno == ESRCH) {
1337                 struct ucred u;
1338
1339                 /* Hmm, presumably the sender process vanished
1340                  * by now, so let's fix it as good as we
1341                  * can, and retry */
1342
1343                 u = *ucred;
1344                 u.pid = getpid();
1345                 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1346
1347                 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1348                         return;
1349         }
1350
1351         log_debug("Failed to forward syslog message: %m");
1352 }
1353
1354 static int process_event(Server *s, struct epoll_event *ev) {
1355         assert(s);
1356
1357         if (ev->data.fd == s->signal_fd) {
1358                 struct signalfd_siginfo sfsi;
1359                 ssize_t n;
1360
1361                 if (ev->events != EPOLLIN) {
1362                         log_info("Got invalid event from epoll.");
1363                         return -EIO;
1364                 }
1365
1366                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1367                 if (n != sizeof(sfsi)) {
1368
1369                         if (n >= 0)
1370                                 return -EIO;
1371
1372                         if (errno == EINTR || errno == EAGAIN)
1373                                 return 0;
1374
1375                         return -errno;
1376                 }
1377
1378                 if (sfsi.ssi_signo == SIGUSR1) {
1379                         server_flush_to_var(s);
1380                         return 0;
1381                 }
1382
1383                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1384                 return 0;
1385
1386         } else if (ev->data.fd == s->native_fd ||
1387                    ev->data.fd == s->syslog_fd) {
1388
1389                 if (ev->events != EPOLLIN) {
1390                         log_info("Got invalid event from epoll.");
1391                         return -EIO;
1392                 }
1393
1394                 for (;;) {
1395                         struct msghdr msghdr;
1396                         struct iovec iovec;
1397                         struct ucred *ucred = NULL;
1398                         struct timeval *tv = NULL;
1399                         struct cmsghdr *cmsg;
1400                         union {
1401                                 struct cmsghdr cmsghdr;
1402                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1403                                             CMSG_SPACE(sizeof(struct timeval))];
1404                         } control;
1405                         ssize_t n;
1406                         int v;
1407
1408                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1409                                 log_error("SIOCINQ failed: %m");
1410                                 return -errno;
1411                         }
1412
1413                         if (v <= 0)
1414                                 return 1;
1415
1416                         if (s->buffer_size < (size_t) v) {
1417                                 void *b;
1418                                 size_t l;
1419
1420                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1421                                 b = realloc(s->buffer, l+1);
1422
1423                                 if (!b) {
1424                                         log_error("Couldn't increase buffer.");
1425                                         return -ENOMEM;
1426                                 }
1427
1428                                 s->buffer_size = l;
1429                                 s->buffer = b;
1430                         }
1431
1432                         zero(iovec);
1433                         iovec.iov_base = s->buffer;
1434                         iovec.iov_len = s->buffer_size;
1435
1436                         zero(control);
1437                         zero(msghdr);
1438                         msghdr.msg_iov = &iovec;
1439                         msghdr.msg_iovlen = 1;
1440                         msghdr.msg_control = &control;
1441                         msghdr.msg_controllen = sizeof(control);
1442
1443                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1444                         if (n < 0) {
1445
1446                                 if (errno == EINTR || errno == EAGAIN)
1447                                         return 1;
1448
1449                                 log_error("recvmsg() failed: %m");
1450                                 return -errno;
1451                         }
1452
1453                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1454
1455                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1456                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1457                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1458                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1459                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1460                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1461                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1462                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1463                         }
1464
1465                         if (ev->data.fd == s->syslog_fd) {
1466                                 char *e;
1467
1468                                 e = memchr(s->buffer, '\n', n);
1469                                 if (e)
1470                                         *e = 0;
1471                                 else
1472                                         s->buffer[n] = 0;
1473
1474                                 forward_syslog(s, s->buffer, n, ucred, tv);
1475                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1476                         } else
1477                                 process_native_message(s, s->buffer, n, ucred, tv);
1478                 }
1479
1480                 return 1;
1481
1482         } else if (ev->data.fd == s->stdout_fd) {
1483
1484                 if (ev->events != EPOLLIN) {
1485                         log_info("Got invalid event from epoll.");
1486                         return -EIO;
1487                 }
1488
1489                 stdout_stream_new(s);
1490                 return 1;
1491
1492         } else {
1493                 StdoutStream *stream;
1494
1495                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1496                         log_info("Got invalid event from epoll.");
1497                         return -EIO;
1498                 }
1499
1500                 /* If it is none of the well-known fds, it must be an
1501                  * stdout stream fd. Note that this is a bit ugly here
1502                  * (since we rely that none of the well-known fds
1503                  * could be interpreted as pointer), but nonetheless
1504                  * safe, since the well-known fds would never get an
1505                  * fd > 4096, i.e. beyond the first memory page */
1506
1507                 stream = ev->data.ptr;
1508
1509                 if (stdout_stream_process(stream) <= 0)
1510                         stdout_stream_free(stream);
1511
1512                 return 1;
1513         }
1514
1515         log_error("Unknown event.");
1516         return 0;
1517 }
1518
1519 static int open_syslog_socket(Server *s) {
1520         union sockaddr_union sa;
1521         int one, r;
1522         struct epoll_event ev;
1523         struct timeval tv;
1524
1525         assert(s);
1526
1527         if (s->syslog_fd < 0) {
1528
1529                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1530                 if (s->syslog_fd < 0) {
1531                         log_error("socket() failed: %m");
1532                         return -errno;
1533                 }
1534
1535                 zero(sa);
1536                 sa.un.sun_family = AF_UNIX;
1537                 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
1538
1539                 unlink(sa.un.sun_path);
1540
1541                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1542                 if (r < 0) {
1543                         log_error("bind() failed: %m");
1544                         return -errno;
1545                 }
1546
1547                 chmod(sa.un.sun_path, 0666);
1548         }
1549
1550         one = 1;
1551         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1552         if (r < 0) {
1553                 log_error("SO_PASSCRED failed: %m");
1554                 return -errno;
1555         }
1556
1557         one = 1;
1558         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1559         if (r < 0) {
1560                 log_error("SO_TIMESTAMP failed: %m");
1561                 return -errno;
1562         }
1563
1564         /* Since we use the same socket for forwarding this to some
1565          * other syslog implementation, make sure we don't hang
1566          * forever */
1567         timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1568         if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1569                 log_error("SO_SNDTIMEO failed: %m");
1570                 return -errno;
1571         }
1572
1573         zero(ev);
1574         ev.events = EPOLLIN;
1575         ev.data.fd = s->syslog_fd;
1576         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1577                 log_error("Failed to add syslog server fd to epoll object: %m");
1578                 return -errno;
1579         }
1580
1581         return 0;
1582 }
1583
1584 static int open_native_socket(Server*s) {
1585         union sockaddr_union sa;
1586         int one, r;
1587         struct epoll_event ev;
1588
1589         assert(s);
1590
1591         if (s->native_fd < 0) {
1592
1593                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1594                 if (s->native_fd < 0) {
1595                         log_error("socket() failed: %m");
1596                         return -errno;
1597                 }
1598
1599                 zero(sa);
1600                 sa.un.sun_family = AF_UNIX;
1601                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1602
1603                 unlink(sa.un.sun_path);
1604
1605                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1606                 if (r < 0) {
1607                         log_error("bind() failed: %m");
1608                         return -errno;
1609                 }
1610
1611                 chmod(sa.un.sun_path, 0666);
1612         }
1613
1614         one = 1;
1615         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1616         if (r < 0) {
1617                 log_error("SO_PASSCRED failed: %m");
1618                 return -errno;
1619         }
1620
1621         one = 1;
1622         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1623         if (r < 0) {
1624                 log_error("SO_TIMESTAMP failed: %m");
1625                 return -errno;
1626         }
1627
1628         zero(ev);
1629         ev.events = EPOLLIN;
1630         ev.data.fd = s->native_fd;
1631         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1632                 log_error("Failed to add native server fd to epoll object: %m");
1633                 return -errno;
1634         }
1635
1636         return 0;
1637 }
1638
1639 static int open_stdout_socket(Server *s) {
1640         union sockaddr_union sa;
1641         int r;
1642         struct epoll_event ev;
1643
1644         assert(s);
1645
1646         if (s->stdout_fd < 0) {
1647
1648                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1649                 if (s->stdout_fd < 0) {
1650                         log_error("socket() failed: %m");
1651                         return -errno;
1652                 }
1653
1654                 zero(sa);
1655                 sa.un.sun_family = AF_UNIX;
1656                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1657
1658                 unlink(sa.un.sun_path);
1659
1660                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1661                 if (r < 0) {
1662                         log_error("bind() failed: %m");
1663                         return -errno;
1664                 }
1665
1666                 chmod(sa.un.sun_path, 0666);
1667
1668                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1669                         log_error("liste() failed: %m");
1670                         return -errno;
1671                 }
1672         }
1673
1674         zero(ev);
1675         ev.events = EPOLLIN;
1676         ev.data.fd = s->stdout_fd;
1677         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1678                 log_error("Failed to add stdout server fd to epoll object: %m");
1679                 return -errno;
1680         }
1681
1682         return 0;
1683 }
1684
1685 static int open_signalfd(Server *s) {
1686         sigset_t mask;
1687         struct epoll_event ev;
1688
1689         assert(s);
1690
1691         assert_se(sigemptyset(&mask) == 0);
1692         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1693         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1694
1695         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1696         if (s->signal_fd < 0) {
1697                 log_error("signalfd(): %m");
1698                 return -errno;
1699         }
1700
1701         zero(ev);
1702         ev.events = EPOLLIN;
1703         ev.data.fd = s->signal_fd;
1704
1705         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1706                 log_error("epoll_ctl(): %m");
1707                 return -errno;
1708         }
1709
1710         return 0;
1711 }
1712
1713 static int server_init(Server *s) {
1714         int n, r, fd;
1715
1716         assert(s);
1717
1718         zero(*s);
1719         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1720         s->metrics.max_size = DEFAULT_MAX_SIZE;
1721         s->metrics.min_size = DEFAULT_MIN_SIZE;
1722         s->metrics.keep_free = DEFAULT_KEEP_FREE;
1723         s->metrics.max_use = DEFAULT_MAX_USE;
1724         s->compress = true;
1725
1726         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1727         if (!s->user_journals) {
1728                 log_error("Out of memory.");
1729                 return -ENOMEM;
1730         }
1731
1732         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1733         if (s->epoll_fd < 0) {
1734                 log_error("Failed to create epoll object: %m");
1735                 return -errno;
1736         }
1737
1738         n = sd_listen_fds(true);
1739         if (n < 0) {
1740                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1741                 return n;
1742         }
1743
1744         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1745
1746                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1747
1748                         if (s->native_fd >= 0) {
1749                                 log_error("Too many native sockets passed.");
1750                                 return -EINVAL;
1751                         }
1752
1753                         s->native_fd = fd;
1754
1755                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1756
1757                         if (s->stdout_fd >= 0) {
1758                                 log_error("Too many stdout sockets passed.");
1759                                 return -EINVAL;
1760                         }
1761
1762                         s->stdout_fd = fd;
1763
1764                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1765
1766                         if (s->syslog_fd >= 0) {
1767                                 log_error("Too many /dev/log sockets passed.");
1768                                 return -EINVAL;
1769                         }
1770
1771                         s->syslog_fd = fd;
1772
1773                 } else {
1774                         log_error("Unknown socket passed.");
1775                         return -EINVAL;
1776                 }
1777         }
1778
1779         r = open_syslog_socket(s);
1780         if (r < 0)
1781                 return r;
1782
1783         r = open_native_socket(s);
1784         if (r < 0)
1785                 return r;
1786
1787         r = open_stdout_socket(s);
1788         if (r < 0)
1789                 return r;
1790
1791         r = system_journal_open(s);
1792         if (r < 0)
1793                 return r;
1794
1795         r = open_signalfd(s);
1796         if (r < 0)
1797                 return r;
1798
1799         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1800         if (!s->rate_limit)
1801                 return -ENOMEM;
1802
1803         return 0;
1804 }
1805
1806 static void server_done(Server *s) {
1807         JournalFile *f;
1808         assert(s);
1809
1810         while (s->stdout_streams)
1811                 stdout_stream_free(s->stdout_streams);
1812
1813         if (s->system_journal)
1814                 journal_file_close(s->system_journal);
1815
1816         if (s->runtime_journal)
1817                 journal_file_close(s->runtime_journal);
1818
1819         while ((f = hashmap_steal_first(s->user_journals)))
1820                 journal_file_close(f);
1821
1822         hashmap_free(s->user_journals);
1823
1824         if (s->epoll_fd >= 0)
1825                 close_nointr_nofail(s->epoll_fd);
1826
1827         if (s->signal_fd >= 0)
1828                 close_nointr_nofail(s->signal_fd);
1829
1830         if (s->syslog_fd >= 0)
1831                 close_nointr_nofail(s->syslog_fd);
1832
1833         if (s->native_fd >= 0)
1834                 close_nointr_nofail(s->native_fd);
1835
1836         if (s->stdout_fd >= 0)
1837                 close_nointr_nofail(s->stdout_fd);
1838
1839         if (s->rate_limit)
1840                 journal_rate_limit_free(s->rate_limit);
1841
1842         free(s->buffer);
1843 }
1844
1845 int main(int argc, char *argv[]) {
1846         Server server;
1847         int r;
1848
1849         /* if (getppid() != 1) { */
1850         /*         log_error("This program should be invoked by init only."); */
1851         /*         return EXIT_FAILURE; */
1852         /* } */
1853
1854         if (argc > 1) {
1855                 log_error("This program does not take arguments.");
1856                 return EXIT_FAILURE;
1857         }
1858
1859         log_set_target(LOG_TARGET_CONSOLE);
1860         log_parse_environment();
1861         log_open();
1862
1863         umask(0022);
1864
1865         r = server_init(&server);
1866         if (r < 0)
1867                 goto finish;
1868
1869         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1870
1871         sd_notify(false,
1872                   "READY=1\n"
1873                   "STATUS=Processing requests...");
1874
1875         server_vacuum(&server);
1876         server_flush_to_var(&server);
1877
1878         for (;;) {
1879                 struct epoll_event event;
1880
1881                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1882                 if (r < 0) {
1883
1884                         if (errno == EINTR)
1885                                 continue;
1886
1887                         log_error("epoll_wait() failed: %m");
1888                         r = -errno;
1889                         goto finish;
1890                 } else if (r == 0)
1891                         break;
1892
1893                 r = process_event(&server, &event);
1894                 if (r < 0)
1895                         goto finish;
1896                 else if (r == 0)
1897                         break;
1898         }
1899
1900         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1901
1902 finish:
1903         sd_notify(false,
1904                   "STATUS=Shutting down...");
1905
1906         server_done(&server);
1907
1908         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1909 }