chiark / gitweb /
journald: flush /run to /var as soon as it becomes available
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "journal-internal.h"
45
46 #define USER_JOURNALS_MAX 1024
47 #define STDOUT_STREAMS_MAX 4096
48
49 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
50 #define DEFAULT_RATE_LIMIT_BURST 200
51
52 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
53
54 typedef struct StdoutStream StdoutStream;
55
56 typedef struct Server {
57         int epoll_fd;
58         int signal_fd;
59         int syslog_fd;
60         int native_fd;
61         int stdout_fd;
62
63         JournalFile *runtime_journal;
64         JournalFile *system_journal;
65         Hashmap *user_journals;
66
67         uint64_t seqnum;
68
69         char *buffer;
70         size_t buffer_size;
71
72         JournalRateLimit *rate_limit;
73
74         JournalMetrics metrics;
75         uint64_t max_use;
76         bool compress;
77
78         uint64_t cached_available_space;
79         usec_t cached_available_space_timestamp;
80
81         LIST_HEAD(StdoutStream, stdout_streams);
82         unsigned n_stdout_streams;
83 } Server;
84
85 typedef enum StdoutStreamState {
86         STDOUT_STREAM_TAG,
87         STDOUT_STREAM_PRIORITY,
88         STDOUT_STREAM_PRIORITY_PREFIX,
89         STDOUT_STREAM_TEE_CONSOLE,
90         STDOUT_STREAM_RUNNING
91 } StdoutStreamState;
92
93 struct StdoutStream {
94         Server *server;
95         StdoutStreamState state;
96
97         int fd;
98
99         struct ucred ucred;
100
101         char *tag;
102         int priority;
103         bool priority_prefix:1;
104         bool tee_console:1;
105
106         char buffer[LINE_MAX+1];
107         size_t length;
108
109         LIST_FIELDS(StdoutStream, stdout_stream);
110 };
111
112 static int server_flush_to_var(Server *s);
113
114 static uint64_t available_space(Server *s) {
115         char ids[33];
116         sd_id128_t machine;
117         char *p;
118         const char *f;
119         struct statvfs ss;
120         uint64_t sum = 0, avail = 0, ss_avail = 0;
121         int r;
122         DIR *d;
123         usec_t ts = now(CLOCK_MONOTONIC);
124
125         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
126                 return s->cached_available_space;
127
128         r = sd_id128_get_machine(&machine);
129         if (r < 0)
130                 return 0;
131
132         if (s->system_journal)
133                 f = "/var/log/journal/";
134         else
135                 f = "/run/log/journal/";
136
137         p = strappend(f, sd_id128_to_string(machine, ids));
138         if (!p)
139                 return 0;
140
141         d = opendir(p);
142         free(p);
143
144         if (!d)
145                 return 0;
146
147         if (fstatvfs(dirfd(d), &ss) < 0)
148                 goto finish;
149
150         for (;;) {
151                 struct stat st;
152                 struct dirent buf, *de;
153                 int k;
154
155                 k = readdir_r(d, &buf, &de);
156                 if (k != 0) {
157                         r = -k;
158                         goto finish;
159                 }
160
161                 if (!de)
162                         break;
163
164                 if (!dirent_is_file_with_suffix(de, ".journal"))
165                         continue;
166
167                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
168                         continue;
169
170                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
171         }
172
173         avail = sum >= s->max_use ? 0 : s->max_use - sum;
174
175         ss_avail = ss.f_bsize * ss.f_bavail;
176
177         ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
178
179         if (ss_avail < avail)
180                 avail = ss_avail;
181
182         s->cached_available_space = avail;
183         s->cached_available_space_timestamp = ts;
184
185 finish:
186         closedir(d);
187
188         return avail;
189 }
190
191 static void fix_perms(JournalFile *f, uid_t uid) {
192         acl_t acl;
193         acl_entry_t entry;
194         acl_permset_t permset;
195         int r;
196
197         assert(f);
198
199         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
200         if (r < 0)
201                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
202
203         if (uid <= 0)
204                 return;
205
206         acl = acl_get_fd(f->fd);
207         if (!acl) {
208                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
209                 return;
210         }
211
212         r = acl_find_uid(acl, uid, &entry);
213         if (r <= 0) {
214
215                 if (acl_create_entry(&acl, &entry) < 0 ||
216                     acl_set_tag_type(entry, ACL_USER) < 0 ||
217                     acl_set_qualifier(entry, &uid) < 0) {
218                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
219                         goto finish;
220                 }
221         }
222
223         if (acl_get_permset(entry, &permset) < 0 ||
224             acl_add_perm(permset, ACL_READ) < 0 ||
225             acl_calc_mask(&acl) < 0) {
226                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
227                 goto finish;
228         }
229
230         if (acl_set_fd(f->fd, acl) < 0)
231                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
232
233 finish:
234         acl_free(acl);
235 }
236
237 static JournalFile* find_journal(Server *s, uid_t uid) {
238         char *p;
239         int r;
240         JournalFile *f;
241         char ids[33];
242         sd_id128_t machine;
243
244         assert(s);
245
246         /* We split up user logs only on /var, not on /run. If the
247          * runtime file is open, we write to it exclusively, in order
248          * to guarantee proper order as soon as we flush /run to
249          * /var and close the runtime file. */
250
251         if (s->runtime_journal)
252                 return s->runtime_journal;
253
254         if (uid <= 0)
255                 return s->system_journal;
256
257         r = sd_id128_get_machine(&machine);
258         if (r < 0)
259                 return s->system_journal;
260
261         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
262         if (f)
263                 return f;
264
265         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
266                 return s->system_journal;
267
268         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
269                 /* Too many open? Then let's close one */
270                 f = hashmap_steal_first(s->user_journals);
271                 assert(f);
272                 journal_file_close(f);
273         }
274
275         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
276         free(p);
277
278         if (r < 0)
279                 return s->system_journal;
280
281         fix_perms(f, uid);
282         f->metrics = s->metrics;
283         f->compress = s->compress;
284
285         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
286         if (r < 0) {
287                 journal_file_close(f);
288                 return s->system_journal;
289         }
290
291         return f;
292 }
293
294 static void server_vacuum(Server *s) {
295         Iterator i;
296         void *k;
297         char *p;
298         char ids[33];
299         sd_id128_t machine;
300         int r;
301         JournalFile *f;
302
303         log_info("Rotating...");
304
305         if (s->runtime_journal) {
306                 r = journal_file_rotate(&s->runtime_journal);
307                 if (r < 0)
308                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
309         }
310
311         if (s->system_journal) {
312                 r = journal_file_rotate(&s->system_journal);
313                 if (r < 0)
314                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
315         }
316
317         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
318                 r = journal_file_rotate(&f);
319                 if (r < 0)
320                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
321                 else
322                         hashmap_replace(s->user_journals, k, f);
323         }
324
325         log_info("Vacuuming...");
326
327         r = sd_id128_get_machine(&machine);
328         if (r < 0) {
329                 log_error("Failed to get machine ID: %s", strerror(-r));
330                 return;
331         }
332
333         if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
334                 log_error("Out of memory.");
335                 return;
336         }
337
338         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
339         if (r < 0 && r != -ENOENT)
340                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
341         free(p);
342
343         if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
344                 log_error("Out of memory.");
345                 return;
346         }
347
348         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
349         if (r < 0 && r != -ENOENT)
350                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
351         free(p);
352
353         s->cached_available_space_timestamp = 0;
354 }
355
356 static char *shortened_cgroup_path(pid_t pid) {
357         int r;
358         char *process_path, *init_path, *path;
359
360         assert(pid > 0);
361
362         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
363         if (r < 0)
364                 return NULL;
365
366         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
367         if (r < 0) {
368                 free(process_path);
369                 return NULL;
370         }
371
372         if (streq(init_path, "/"))
373                 init_path[0] = 0;
374
375         if (startswith(process_path, init_path))
376                 path = process_path + strlen(init_path);
377         else
378                 path = process_path;
379
380         free(init_path);
381
382         return path;
383 }
384
385 static void dispatch_message_real(Server *s,
386                              struct iovec *iovec, unsigned n, unsigned m,
387                              struct ucred *ucred,
388                              struct timeval *tv) {
389
390         char *pid = NULL, *uid = NULL, *gid = NULL,
391                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
392                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
393                 *audit_session = NULL, *audit_loginuid = NULL,
394                 *exe = NULL, *cgroup = NULL;
395
396         char idbuf[33];
397         sd_id128_t id;
398         int r;
399         char *t;
400         uid_t loginuid = 0, realuid = 0;
401         JournalFile *f;
402         bool vacuumed = false;
403
404         assert(s);
405         assert(iovec);
406         assert(n > 0);
407         assert(n + 13 <= m);
408
409         if (ucred) {
410                 uint32_t session;
411                 char *path;
412
413                 realuid = ucred->uid;
414
415                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
416                         IOVEC_SET_STRING(iovec[n++], pid);
417
418                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
419                         IOVEC_SET_STRING(iovec[n++], uid);
420
421                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
422                         IOVEC_SET_STRING(iovec[n++], gid);
423
424                 r = get_process_comm(ucred->pid, &t);
425                 if (r >= 0) {
426                         comm = strappend("_COMM=", t);
427                         if (comm)
428                                 IOVEC_SET_STRING(iovec[n++], comm);
429                         free(t);
430                 }
431
432                 r = get_process_exe(ucred->pid, &t);
433                 if (r >= 0) {
434                         exe = strappend("_EXE=", t);
435                         if (comm)
436                                 IOVEC_SET_STRING(iovec[n++], exe);
437                         free(t);
438                 }
439
440                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
441                 if (r >= 0) {
442                         cmdline = strappend("_CMDLINE=", t);
443                         if (cmdline)
444                                 IOVEC_SET_STRING(iovec[n++], cmdline);
445                         free(t);
446                 }
447
448                 r = audit_session_from_pid(ucred->pid, &session);
449                 if (r >= 0)
450                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
451                                 IOVEC_SET_STRING(iovec[n++], audit_session);
452
453                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
454                 if (r >= 0)
455                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
456                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
457
458                 path = shortened_cgroup_path(ucred->pid);
459                 if (path) {
460                         cgroup = strappend("_SYSTEMD_CGROUP=", path);
461                         if (cgroup)
462                                 IOVEC_SET_STRING(iovec[n++], cgroup);
463
464                         free(path);
465                 }
466         }
467
468         if (tv) {
469                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
470                              (unsigned long long) timeval_load(tv)) >= 0)
471                         IOVEC_SET_STRING(iovec[n++], source_time);
472         }
473
474         /* Note that strictly speaking storing the boot id here is
475          * redundant since the entry includes this in-line
476          * anyway. However, we need this indexed, too. */
477         r = sd_id128_get_boot(&id);
478         if (r >= 0)
479                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
480                         IOVEC_SET_STRING(iovec[n++], boot_id);
481
482         r = sd_id128_get_machine(&id);
483         if (r >= 0)
484                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
485                         IOVEC_SET_STRING(iovec[n++], machine_id);
486
487         t = gethostname_malloc();
488         if (t) {
489                 hostname = strappend("_HOSTNAME=", t);
490                 if (hostname)
491                         IOVEC_SET_STRING(iovec[n++], hostname);
492                 free(t);
493         }
494
495         assert(n <= m);
496
497         server_flush_to_var(s);
498
499 retry:
500         f = find_journal(s, realuid == 0 ? 0 : loginuid);
501         if (!f)
502                 log_warning("Dropping message, as we can't find a place to store the data.");
503         else {
504                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
505
506                 if (r == -E2BIG && !vacuumed) {
507                         log_info("Allocation limit reached.");
508
509                         server_vacuum(s);
510                         vacuumed = true;
511
512                         log_info("Retrying write.");
513                         goto retry;
514                 }
515
516                 if (r < 0)
517                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
518         }
519
520         free(pid);
521         free(uid);
522         free(gid);
523         free(comm);
524         free(exe);
525         free(cmdline);
526         free(source_time);
527         free(boot_id);
528         free(machine_id);
529         free(hostname);
530         free(audit_session);
531         free(audit_loginuid);
532         free(cgroup);
533 }
534
535 static void dispatch_message(Server *s,
536                              struct iovec *iovec, unsigned n, unsigned m,
537                              struct ucred *ucred,
538                              struct timeval *tv,
539                              int priority) {
540         int rl;
541         char *path, *c;
542
543         assert(s);
544         assert(iovec || n == 0);
545
546         if (n == 0)
547                 return;
548
549         if (!ucred)
550                 goto finish;
551
552         path = shortened_cgroup_path(ucred->pid);
553         if (!path)
554                 goto finish;
555
556         /* example: /user/lennart/3/foobar
557          *          /system/dbus.service/foobar
558          *
559          * So let's cut of everything past the third /, since that is
560          * wher user directories start */
561
562         c = strchr(path, '/');
563         if (c) {
564                 c = strchr(c+1, '/');
565                 if (c) {
566                         c = strchr(c+1, '/');
567                         if (c)
568                                 *c = 0;
569                 }
570         }
571
572         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
573
574         if (rl == 0) {
575                 free(path);
576                 return;
577         }
578
579         if (rl > 1) {
580                 int j = 0;
581                 char suppress_message[LINE_MAX];
582                 struct iovec suppress_iovec[15];
583
584                 /* Write a suppression message if we suppressed something */
585
586                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
587                 char_array_0(suppress_message);
588
589                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
590                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
591
592                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
593         }
594
595         free(path);
596
597 finish:
598         dispatch_message_real(s, iovec, n, m, ucred, tv);
599 }
600
601 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
602         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
603         struct iovec iovec[16];
604         unsigned n = 0;
605         int priority = LOG_USER | LOG_INFO;
606
607         assert(s);
608         assert(buf);
609
610         parse_syslog_priority((char**) &buf, &priority);
611         skip_syslog_date((char**) &buf);
612
613         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
614                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
615
616         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
617                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
618
619         message = strappend("MESSAGE=", buf);
620         if (message)
621                 IOVEC_SET_STRING(iovec[n++], message);
622
623         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
624
625         free(message);
626         free(syslog_facility);
627         free(syslog_priority);
628 }
629
630 static bool valid_user_field(const char *p, size_t l) {
631         const char *a;
632
633         /* We kinda enforce POSIX syntax recommendations for
634            environment variables here, but make a couple of additional
635            requirements.
636
637            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
638
639         /* No empty field names */
640         if (l <= 0)
641                 return false;
642
643         /* Don't allow names longer than 64 chars */
644         if (l > 64)
645                 return false;
646
647         /* Variables starting with an underscore are protected */
648         if (p[0] == '_')
649                 return false;
650
651         /* Don't allow digits as first character */
652         if (p[0] >= '0' && p[0] <= '9')
653                 return false;
654
655         /* Only allow A-Z0-9 and '_' */
656         for (a = p; a < p + l; a++)
657                 if (!((*a >= 'A' && *a <= 'Z') ||
658                       (*a >= '0' && *a <= '9') ||
659                       *a == '_'))
660                         return false;
661
662         return true;
663 }
664
665 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
666         struct iovec *iovec = NULL;
667         unsigned n = 0, m = 0, j;
668         const char *p;
669         size_t remaining;
670         int priority = LOG_INFO;
671
672         assert(s);
673         assert(buffer || n == 0);
674
675         p = buffer;
676         remaining = buffer_size;
677
678         while (remaining > 0) {
679                 const char *e, *q;
680
681                 e = memchr(p, '\n', remaining);
682
683                 if (!e) {
684                         /* Trailing noise, let's ignore it, and flush what we collected */
685                         log_debug("Received message with trailing noise, ignoring.");
686                         break;
687                 }
688
689                 if (e == p) {
690                         /* Entry separator */
691                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
692                         n = 0;
693                         priority = LOG_INFO;
694
695                         p++;
696                         remaining--;
697                         continue;
698                 }
699
700                 if (*p == '.' || *p == '#') {
701                         /* Ignore control commands for now, and
702                          * comments too. */
703                         remaining -= (e - p) + 1;
704                         p = e + 1;
705                         continue;
706                 }
707
708                 /* A property follows */
709
710                 if (n+13 >= m) {
711                         struct iovec *c;
712                         unsigned u;
713
714                         u = MAX((n+13U) * 2U, 4U);
715                         c = realloc(iovec, u * sizeof(struct iovec));
716                         if (!c) {
717                                 log_error("Out of memory");
718                                 break;
719                         }
720
721                         iovec = c;
722                         m = u;
723                 }
724
725                 q = memchr(p, '=', e - p);
726                 if (q) {
727                         if (valid_user_field(p, q - p)) {
728                                 /* If the field name starts with an
729                                  * underscore, skip the variable,
730                                  * since that indidates a trusted
731                                  * field */
732                                 iovec[n].iov_base = (char*) p;
733                                 iovec[n].iov_len = e - p;
734                                 n++;
735
736                                 /* We need to determine the priority
737                                  * of this entry for the rate limiting
738                                  * logic */
739                                 if (e - p == 10 &&
740                                     memcmp(p, "PRIORITY=", 10) == 0 &&
741                                     p[10] >= '0' &&
742                                     p[10] <= '9')
743                                         priority = p[10] - '0';
744                         }
745
746                         remaining -= (e - p) + 1;
747                         p = e + 1;
748                         continue;
749                 } else {
750                         uint64_t l;
751                         char *k;
752
753                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
754                                 log_debug("Failed to parse message, ignoring.");
755                                 break;
756                         }
757
758                         memcpy(&l, e + 1, sizeof(uint64_t));
759                         l = le64toh(l);
760
761                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
762                             e[1+sizeof(uint64_t)+l] != '\n') {
763                                 log_debug("Failed to parse message, ignoring.");
764                                 break;
765                         }
766
767                         k = malloc((e - p) + 1 + l);
768                         if (!k) {
769                                 log_error("Out of memory");
770                                 break;
771                         }
772
773                         memcpy(k, p, e - p);
774                         k[e - p] = '=';
775                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
776
777                         if (valid_user_field(p, e - p)) {
778                                 iovec[n].iov_base = k;
779                                 iovec[n].iov_len = (e - p) + 1 + l;
780                                 n++;
781                         } else
782                                 free(k);
783
784                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
785                         p = e + 1 + sizeof(uint64_t) + l + 1;
786                 }
787         }
788
789         dispatch_message(s, iovec, n, m, ucred, tv, priority);
790
791         for (j = 0; j < n; j++)
792                 if (iovec[j].iov_base < buffer ||
793                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
794                         free(iovec[j].iov_base);
795 }
796
797 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
798         struct iovec iovec[15];
799         char *message = NULL, *syslog_priority = NULL;
800         unsigned n = 0;
801         size_t tag_len;
802         int priority;
803
804         assert(s);
805         assert(p);
806
807         priority = s->priority;
808
809         if (s->priority_prefix &&
810             l > 3 &&
811             p[0] == '<' &&
812             p[1] >= '0' && p[1] <= '7' &&
813             p[2] == '>') {
814
815                 priority = p[1] - '0';
816                 p += 3;
817                 l -= 3;
818         }
819
820         if (l <= 0)
821                 return 0;
822
823         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
824                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
825
826         tag_len = s->tag ? strlen(s->tag) + 2: 0;
827         message = malloc(8 + tag_len + l);
828         if (message) {
829                 memcpy(message, "MESSAGE=", 8);
830
831                 if (s->tag) {
832                         memcpy(message+8, s->tag, tag_len-2);
833                         memcpy(message+8+tag_len-2, ": ", 2);
834                 }
835
836                 memcpy(message+8+tag_len, p, l);
837                 iovec[n].iov_base = message;
838                 iovec[n].iov_len = 8+tag_len+l;
839                 n++;
840         }
841
842         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
843
844         if (s->tee_console) {
845                 int console;
846
847                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
848                 if (console >= 0) {
849                         n = 0;
850                         if (s->tag) {
851                                 IOVEC_SET_STRING(iovec[n++], s->tag);
852                                 IOVEC_SET_STRING(iovec[n++], ": ");
853                         }
854
855                         iovec[n].iov_base = (void*) p;
856                         iovec[n].iov_len = l;
857                         n++;
858
859                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
860
861                         writev(console, iovec, n);
862                 }
863         }
864
865         free(message);
866         free(syslog_priority);
867
868         return 0;
869 }
870
871 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
872         assert(s);
873         assert(p);
874
875         while (l > 0 && strchr(WHITESPACE, *p)) {
876                 l--;
877                 p++;
878         }
879
880         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
881                 l--;
882
883         switch (s->state) {
884
885         case STDOUT_STREAM_TAG:
886
887                 if (l > 0) {
888                         s->tag = strndup(p, l);
889                         if (!s->tag) {
890                                 log_error("Out of memory");
891                                 return -EINVAL;
892                         }
893                 }
894
895                 s->state = STDOUT_STREAM_PRIORITY;
896                 return 0;
897
898         case STDOUT_STREAM_PRIORITY:
899                 if (l != 1 || *p < '0' || *p > '7') {
900                         log_warning("Failed to parse log priority line.");
901                         return -EINVAL;
902                 }
903
904                 s->priority = *p - '0';
905                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
906                 return 0;
907
908         case STDOUT_STREAM_PRIORITY_PREFIX:
909                 if (l != 1 || *p < '0' || *p > '1') {
910                         log_warning("Failed to parse priority prefix line.");
911                         return -EINVAL;
912                 }
913
914                 s->priority_prefix = *p - '0';
915                 s->state = STDOUT_STREAM_TEE_CONSOLE;
916                 return 0;
917
918         case STDOUT_STREAM_TEE_CONSOLE:
919                 if (l != 1 || *p < '0' || *p > '1') {
920                         log_warning("Failed to parse tee to console line.");
921                         return -EINVAL;
922                 }
923
924                 s->tee_console = *p - '0';
925                 s->state = STDOUT_STREAM_RUNNING;
926                 return 0;
927
928         case STDOUT_STREAM_RUNNING:
929                 return stdout_stream_log(s, p, l);
930         }
931
932         assert_not_reached("Unknown stream state");
933 }
934
935 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
936         char *p;
937         size_t remaining;
938         int r;
939
940         assert(s);
941
942         p = s->buffer;
943         remaining = s->length;
944         for (;;) {
945                 char *end;
946                 size_t skip;
947
948                 end = memchr(p, '\n', remaining);
949                 if (!end) {
950                         if (remaining >= LINE_MAX) {
951                                 end = p + LINE_MAX;
952                                 skip = LINE_MAX;
953                         } else
954                                 break;
955                 } else
956                         skip = end - p + 1;
957
958                 r = stdout_stream_line(s, p, end - p);
959                 if (r < 0)
960                         return r;
961
962                 remaining -= skip;
963                 p += skip;
964         }
965
966         if (force_flush && remaining > 0) {
967                 r = stdout_stream_line(s, p, remaining);
968                 if (r < 0)
969                         return r;
970
971                 p += remaining;
972                 remaining = 0;
973         }
974
975         if (p > s->buffer) {
976                 memmove(s->buffer, p, remaining);
977                 s->length = remaining;
978         }
979
980         return 0;
981 }
982
983 static int stdout_stream_process(StdoutStream *s) {
984         ssize_t l;
985         int r;
986
987         assert(s);
988
989         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
990         if (l < 0) {
991
992                 if (errno == EAGAIN)
993                         return 0;
994
995                 log_warning("Failed to read from stream: %m");
996                 return -errno;
997         }
998
999         if (l == 0) {
1000                 r = stdout_stream_scan(s, true);
1001                 if (r < 0)
1002                         return r;
1003
1004                 return 0;
1005         }
1006
1007         s->length += l;
1008         r = stdout_stream_scan(s, false);
1009         if (r < 0)
1010                 return r;
1011
1012         return 1;
1013
1014 }
1015
1016 static void stdout_stream_free(StdoutStream *s) {
1017         assert(s);
1018
1019         if (s->server) {
1020                 assert(s->server->n_stdout_streams > 0);
1021                 s->server->n_stdout_streams --;
1022                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1023         }
1024
1025         if (s->fd >= 0) {
1026                 if (s->server)
1027                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1028
1029                 close_nointr_nofail(s->fd);
1030         }
1031
1032         free(s->tag);
1033         free(s);
1034 }
1035
1036 static int stdout_stream_new(Server *s) {
1037         StdoutStream *stream;
1038         int fd, r;
1039         socklen_t len;
1040         struct epoll_event ev;
1041
1042         assert(s);
1043
1044         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1045         if (fd < 0) {
1046                 if (errno == EAGAIN)
1047                         return 0;
1048
1049                 log_error("Failed to accept stdout connection: %m");
1050                 return -errno;
1051         }
1052
1053         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1054                 log_warning("Too many stdout streams, refusing connection.");
1055                 close_nointr_nofail(fd);
1056                 return 0;
1057         }
1058
1059         stream = new0(StdoutStream, 1);
1060         if (!stream) {
1061                 log_error("Out of memory.");
1062                 close_nointr_nofail(fd);
1063                 return -ENOMEM;
1064         }
1065
1066         stream->fd = fd;
1067
1068         len = sizeof(stream->ucred);
1069         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1070                 log_error("Failed to determine peer credentials: %m");
1071                 r = -errno;
1072                 goto fail;
1073         }
1074
1075         if (shutdown(fd, SHUT_WR) < 0) {
1076                 log_error("Failed to shutdown writing side of socket: %m");
1077                 r = -errno;
1078                 goto fail;
1079         }
1080
1081         zero(ev);
1082         ev.data.ptr = stream;
1083         ev.events = EPOLLIN;
1084         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1085                 log_error("Failed to add stream to event loop: %m");
1086                 r = -errno;
1087                 goto fail;
1088         }
1089
1090         stream->server = s;
1091         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1092         s->n_stdout_streams ++;
1093
1094         return 0;
1095
1096 fail:
1097         stdout_stream_free(stream);
1098         return r;
1099 }
1100
1101 static int system_journal_open(Server *s) {
1102         int r;
1103         char *fn;
1104         sd_id128_t machine;
1105         char ids[33];
1106
1107         r = sd_id128_get_machine(&machine);
1108         if (r < 0)
1109                 return r;
1110
1111         sd_id128_to_string(machine, ids);
1112
1113         if (!s->system_journal) {
1114
1115                 /* First try to create the machine path, but not the prefix */
1116                 fn = strappend("/var/log/journal/", ids);
1117                 if (!fn)
1118                         return -ENOMEM;
1119                 (void) mkdir(fn, 0755);
1120                 free(fn);
1121
1122                 /* The create the system journal file */
1123                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1124                 if (!fn)
1125                         return -ENOMEM;
1126
1127                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1128                 free(fn);
1129
1130                 if (r >= 0) {
1131                         s->system_journal->metrics = s->metrics;
1132                         s->system_journal->compress = s->compress;
1133
1134                         fix_perms(s->system_journal, 0);
1135                 } else if (r < 0) {
1136
1137                         if (r == -ENOENT)
1138                                 r = 0;
1139                         else {
1140                                 log_error("Failed to open system journal: %s", strerror(-r));
1141                                 return r;
1142                         }
1143                 }
1144         }
1145
1146         if (!s->runtime_journal) {
1147
1148                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1149                 if (!fn)
1150                         return -ENOMEM;
1151
1152                 if (s->system_journal) {
1153
1154                         /* Try to open the runtime journal, but only
1155                          * if it already exists, so that we can flush
1156                          * it into the system journal */
1157
1158                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1159                         free(fn);
1160
1161                         if (r < 0) {
1162
1163                                 if (r == -ENOENT)
1164                                         r = 0;
1165                                 else {
1166                                         log_error("Failed to open runtime journal: %s", strerror(-r));
1167                                         return r;
1168                                 }
1169                         }
1170
1171                 } else {
1172
1173                         /* OK, we really need the runtime journal, so create
1174                          * it if necessary. */
1175
1176                         (void) mkdir_parents(fn, 0755);
1177                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1178                         free(fn);
1179
1180                         if (r < 0) {
1181                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1182                                 return r;
1183                         }
1184                 }
1185
1186                 if (s->runtime_journal) {
1187                         s->runtime_journal->metrics = s->metrics;
1188                         s->runtime_journal->compress = s->compress;
1189
1190                         fix_perms(s->runtime_journal, 0);
1191                 }
1192         }
1193
1194         return r;
1195 }
1196
1197 static int server_flush_to_var(Server *s) {
1198         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1199         Object *o = NULL;
1200         int r;
1201         sd_id128_t machine;
1202         sd_journal *j;
1203
1204         assert(s);
1205
1206         system_journal_open(s);
1207
1208         if (!s->system_journal || !s->runtime_journal)
1209                 return 0;
1210
1211         r = sd_id128_get_machine(&machine);
1212         if (r < 0) {
1213                 log_error("Failed to get machine id: %s", strerror(-r));
1214                 return r;
1215         }
1216
1217         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1218         if (r < 0) {
1219                 log_error("Failed to read runtime journal: %s", strerror(-r));
1220                 return r;
1221         }
1222
1223         SD_JOURNAL_FOREACH(j) {
1224                 JournalFile *f;
1225
1226                 f = j->current_file;
1227                 assert(f && f->current_offset > 0);
1228
1229                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1230                 if (r < 0) {
1231                         log_error("Can't read entry: %s", strerror(-r));
1232                         goto finish;
1233                 }
1234
1235                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1236                 if (r == -E2BIG) {
1237                         log_info("Allocation limit reached.");
1238
1239                         journal_file_post_change(s->system_journal);
1240                         server_vacuum(s);
1241
1242                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1243                 }
1244
1245                 if (r < 0) {
1246                         log_error("Can't write entry: %s", strerror(-r));
1247                         goto finish;
1248                 }
1249         }
1250
1251 finish:
1252         journal_file_post_change(s->system_journal);
1253
1254         journal_file_close(s->runtime_journal);
1255         s->runtime_journal = NULL;
1256
1257         if (r >= 0) {
1258                 sd_id128_to_string(machine, path + 17);
1259                 rm_rf(path, false, true, false);
1260         }
1261
1262         return r;
1263 }
1264
1265 static int process_event(Server *s, struct epoll_event *ev) {
1266         assert(s);
1267
1268         if (ev->data.fd == s->signal_fd) {
1269                 struct signalfd_siginfo sfsi;
1270                 ssize_t n;
1271
1272                 if (ev->events != EPOLLIN) {
1273                         log_info("Got invalid event from epoll.");
1274                         return -EIO;
1275                 }
1276
1277                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1278                 if (n != sizeof(sfsi)) {
1279
1280                         if (n >= 0)
1281                                 return -EIO;
1282
1283                         if (errno == EINTR || errno == EAGAIN)
1284                                 return 0;
1285
1286                         return -errno;
1287                 }
1288
1289                 if (sfsi.ssi_signo == SIGUSR1) {
1290                         server_flush_to_var(s);
1291                         return 0;
1292                 }
1293
1294                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1295                 return 0;
1296
1297         } else if (ev->data.fd == s->native_fd ||
1298                    ev->data.fd == s->syslog_fd) {
1299
1300                 if (ev->events != EPOLLIN) {
1301                         log_info("Got invalid event from epoll.");
1302                         return -EIO;
1303                 }
1304
1305                 for (;;) {
1306                         struct msghdr msghdr;
1307                         struct iovec iovec;
1308                         struct ucred *ucred = NULL;
1309                         struct timeval *tv = NULL;
1310                         struct cmsghdr *cmsg;
1311                         union {
1312                                 struct cmsghdr cmsghdr;
1313                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1314                                             CMSG_SPACE(sizeof(struct timeval))];
1315                         } control;
1316                         ssize_t n;
1317                         int v;
1318
1319                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1320                                 log_error("SIOCINQ failed: %m");
1321                                 return -errno;
1322                         }
1323
1324                         if (v <= 0)
1325                                 return 1;
1326
1327                         if (s->buffer_size < (size_t) v) {
1328                                 void *b;
1329                                 size_t l;
1330
1331                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1332                                 b = realloc(s->buffer, l+1);
1333
1334                                 if (!b) {
1335                                         log_error("Couldn't increase buffer.");
1336                                         return -ENOMEM;
1337                                 }
1338
1339                                 s->buffer_size = l;
1340                                 s->buffer = b;
1341                         }
1342
1343                         zero(iovec);
1344                         iovec.iov_base = s->buffer;
1345                         iovec.iov_len = s->buffer_size;
1346
1347                         zero(control);
1348                         zero(msghdr);
1349                         msghdr.msg_iov = &iovec;
1350                         msghdr.msg_iovlen = 1;
1351                         msghdr.msg_control = &control;
1352                         msghdr.msg_controllen = sizeof(control);
1353
1354                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1355                         if (n < 0) {
1356
1357                                 if (errno == EINTR || errno == EAGAIN)
1358                                         return 1;
1359
1360                                 log_error("recvmsg() failed: %m");
1361                                 return -errno;
1362                         }
1363
1364                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1365
1366                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1367                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1368                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1369                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1370                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1371                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1372                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1373                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1374                         }
1375
1376                         if (ev->data.fd == s->syslog_fd) {
1377                                 char *e;
1378
1379                                 e = memchr(s->buffer, '\n', n);
1380                                 if (e)
1381                                         *e = 0;
1382                                 else
1383                                         s->buffer[n] = 0;
1384
1385                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1386                         } else
1387                                 process_native_message(s, s->buffer, n, ucred, tv);
1388                 }
1389
1390                 return 1;
1391
1392         } else if (ev->data.fd == s->stdout_fd) {
1393
1394                 if (ev->events != EPOLLIN) {
1395                         log_info("Got invalid event from epoll.");
1396                         return -EIO;
1397                 }
1398
1399                 stdout_stream_new(s);
1400                 return 1;
1401
1402         } else {
1403                 StdoutStream *stream;
1404
1405                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1406                         log_info("Got invalid event from epoll.");
1407                         return -EIO;
1408                 }
1409
1410                 /* If it is none of the well-known fds, it must be an
1411                  * stdout stream fd. Note that this is a bit ugly here
1412                  * (since we rely that none of the well-known fds
1413                  * could be interpreted as pointer), but nonetheless
1414                  * safe, since the well-known fds would never get an
1415                  * fd > 4096, i.e. beyond the first memory page */
1416
1417                 stream = ev->data.ptr;
1418
1419                 if (stdout_stream_process(stream) <= 0)
1420                         stdout_stream_free(stream);
1421
1422                 return 1;
1423         }
1424
1425         log_error("Unknown event.");
1426         return 0;
1427 }
1428
1429 static int open_syslog_socket(Server *s) {
1430         union sockaddr_union sa;
1431         int one, r;
1432         struct epoll_event ev;
1433
1434         assert(s);
1435
1436         if (s->syslog_fd < 0) {
1437
1438                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1439                 if (s->syslog_fd < 0) {
1440                         log_error("socket() failed: %m");
1441                         return -errno;
1442                 }
1443
1444                 zero(sa);
1445                 sa.un.sun_family = AF_UNIX;
1446                 strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1447
1448                 unlink(sa.un.sun_path);
1449
1450                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1451                 if (r < 0) {
1452                         log_error("bind() failed: %m");
1453                         return -errno;
1454                 }
1455
1456                 chmod(sa.un.sun_path, 0666);
1457         }
1458
1459         one = 1;
1460         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1461         if (r < 0) {
1462                 log_error("SO_PASSCRED failed: %m");
1463                 return -errno;
1464         }
1465
1466         one = 1;
1467         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1468         if (r < 0) {
1469                 log_error("SO_TIMESTAMP failed: %m");
1470                 return -errno;
1471         }
1472
1473         zero(ev);
1474         ev.events = EPOLLIN;
1475         ev.data.fd = s->syslog_fd;
1476         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1477                 log_error("Failed to add syslog server fd to epoll object: %m");
1478                 return -errno;
1479         }
1480
1481         return 0;
1482 }
1483
1484 static int open_native_socket(Server*s) {
1485         union sockaddr_union sa;
1486         int one, r;
1487         struct epoll_event ev;
1488
1489         assert(s);
1490
1491         if (s->native_fd < 0) {
1492
1493                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1494                 if (s->native_fd < 0) {
1495                         log_error("socket() failed: %m");
1496                         return -errno;
1497                 }
1498
1499                 zero(sa);
1500                 sa.un.sun_family = AF_UNIX;
1501                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1502
1503                 unlink(sa.un.sun_path);
1504
1505                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1506                 if (r < 0) {
1507                         log_error("bind() failed: %m");
1508                         return -errno;
1509                 }
1510
1511                 chmod(sa.un.sun_path, 0666);
1512         }
1513
1514         one = 1;
1515         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1516         if (r < 0) {
1517                 log_error("SO_PASSCRED failed: %m");
1518                 return -errno;
1519         }
1520
1521         one = 1;
1522         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1523         if (r < 0) {
1524                 log_error("SO_TIMESTAMP failed: %m");
1525                 return -errno;
1526         }
1527
1528         zero(ev);
1529         ev.events = EPOLLIN;
1530         ev.data.fd = s->native_fd;
1531         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1532                 log_error("Failed to add native server fd to epoll object: %m");
1533                 return -errno;
1534         }
1535
1536         return 0;
1537 }
1538
1539 static int open_stdout_socket(Server *s) {
1540         union sockaddr_union sa;
1541         int r;
1542         struct epoll_event ev;
1543
1544         assert(s);
1545
1546         if (s->stdout_fd < 0) {
1547
1548                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1549                 if (s->stdout_fd < 0) {
1550                         log_error("socket() failed: %m");
1551                         return -errno;
1552                 }
1553
1554                 zero(sa);
1555                 sa.un.sun_family = AF_UNIX;
1556                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1557
1558                 unlink(sa.un.sun_path);
1559
1560                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1561                 if (r < 0) {
1562                         log_error("bind() failed: %m");
1563                         return -errno;
1564                 }
1565
1566                 chmod(sa.un.sun_path, 0666);
1567
1568                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1569                         log_error("liste() failed: %m");
1570                         return -errno;
1571                 }
1572         }
1573
1574         zero(ev);
1575         ev.events = EPOLLIN;
1576         ev.data.fd = s->stdout_fd;
1577         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1578                 log_error("Failed to add stdout server fd to epoll object: %m");
1579                 return -errno;
1580         }
1581
1582         return 0;
1583 }
1584
1585 static int open_signalfd(Server *s) {
1586         sigset_t mask;
1587         struct epoll_event ev;
1588
1589         assert(s);
1590
1591         assert_se(sigemptyset(&mask) == 0);
1592         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1593         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1594
1595         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1596         if (s->signal_fd < 0) {
1597                 log_error("signalfd(): %m");
1598                 return -errno;
1599         }
1600
1601         zero(ev);
1602         ev.events = EPOLLIN;
1603         ev.data.fd = s->signal_fd;
1604
1605         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1606                 log_error("epoll_ctl(): %m");
1607                 return -errno;
1608         }
1609
1610         return 0;
1611 }
1612
1613 static int server_init(Server *s) {
1614         int n, r, fd;
1615
1616         assert(s);
1617
1618         zero(*s);
1619         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1620         s->metrics.max_size = DEFAULT_MAX_SIZE;
1621         s->metrics.min_size = DEFAULT_MIN_SIZE;
1622         s->metrics.keep_free = DEFAULT_KEEP_FREE;
1623         s->max_use = DEFAULT_MAX_USE;
1624         s->compress = true;
1625
1626         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1627         if (!s->user_journals) {
1628                 log_error("Out of memory.");
1629                 return -ENOMEM;
1630         }
1631
1632         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1633         if (s->epoll_fd < 0) {
1634                 log_error("Failed to create epoll object: %m");
1635                 return -errno;
1636         }
1637
1638         n = sd_listen_fds(true);
1639         if (n < 0) {
1640                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1641                 return n;
1642         }
1643
1644         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1645
1646                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1647
1648                         if (s->native_fd >= 0) {
1649                                 log_error("Too many native sockets passed.");
1650                                 return -EINVAL;
1651                         }
1652
1653                         s->native_fd = fd;
1654
1655                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1656
1657                         if (s->stdout_fd >= 0) {
1658                                 log_error("Too many stdout sockets passed.");
1659                                 return -EINVAL;
1660                         }
1661
1662                         s->stdout_fd = fd;
1663
1664                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1665
1666                         if (s->syslog_fd >= 0) {
1667                                 log_error("Too many /dev/log sockets passed.");
1668                                 return -EINVAL;
1669                         }
1670
1671                         s->syslog_fd = fd;
1672
1673                 } else {
1674                         log_error("Unknown socket passed.");
1675                         return -EINVAL;
1676                 }
1677         }
1678
1679         r = open_syslog_socket(s);
1680         if (r < 0)
1681                 return r;
1682
1683         r = open_native_socket(s);
1684         if (r < 0)
1685                 return r;
1686
1687         r = open_stdout_socket(s);
1688         if (r < 0)
1689                 return r;
1690
1691         r = system_journal_open(s);
1692         if (r < 0)
1693                 return r;
1694
1695         r = open_signalfd(s);
1696         if (r < 0)
1697                 return r;
1698
1699         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1700         if (!s->rate_limit)
1701                 return -ENOMEM;
1702
1703         return 0;
1704 }
1705
1706 static void server_done(Server *s) {
1707         JournalFile *f;
1708         assert(s);
1709
1710         while (s->stdout_streams)
1711                 stdout_stream_free(s->stdout_streams);
1712
1713         if (s->system_journal)
1714                 journal_file_close(s->system_journal);
1715
1716         if (s->runtime_journal)
1717                 journal_file_close(s->runtime_journal);
1718
1719         while ((f = hashmap_steal_first(s->user_journals)))
1720                 journal_file_close(f);
1721
1722         hashmap_free(s->user_journals);
1723
1724         if (s->epoll_fd >= 0)
1725                 close_nointr_nofail(s->epoll_fd);
1726
1727         if (s->signal_fd >= 0)
1728                 close_nointr_nofail(s->signal_fd);
1729
1730         if (s->syslog_fd >= 0)
1731                 close_nointr_nofail(s->syslog_fd);
1732
1733         if (s->native_fd >= 0)
1734                 close_nointr_nofail(s->native_fd);
1735
1736         if (s->stdout_fd >= 0)
1737                 close_nointr_nofail(s->stdout_fd);
1738
1739         if (s->rate_limit)
1740                 journal_rate_limit_free(s->rate_limit);
1741 }
1742
1743 int main(int argc, char *argv[]) {
1744         Server server;
1745         int r;
1746
1747         /* if (getppid() != 1) { */
1748         /*         log_error("This program should be invoked by init only."); */
1749         /*         return EXIT_FAILURE; */
1750         /* } */
1751
1752         if (argc > 1) {
1753                 log_error("This program does not take arguments.");
1754                 return EXIT_FAILURE;
1755         }
1756
1757         log_set_target(LOG_TARGET_CONSOLE);
1758         log_set_max_level(LOG_DEBUG);
1759         log_parse_environment();
1760         log_open();
1761
1762         umask(0022);
1763
1764         r = server_init(&server);
1765         if (r < 0)
1766                 goto finish;
1767
1768         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1769
1770         sd_notify(false,
1771                   "READY=1\n"
1772                   "STATUS=Processing requests...");
1773
1774         server_vacuum(&server);
1775         server_flush_to_var(&server);
1776
1777         for (;;) {
1778                 struct epoll_event event;
1779
1780                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1781                 if (r < 0) {
1782
1783                         if (errno == EINTR)
1784                                 continue;
1785
1786                         log_error("epoll_wait() failed: %m");
1787                         r = -errno;
1788                         goto finish;
1789                 } else if (r == 0)
1790                         break;
1791
1792                 r = process_event(&server, &event);
1793                 if (r < 0)
1794                         goto finish;
1795                 else if (r == 0)
1796                         break;
1797         }
1798
1799         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1800
1801 finish:
1802         sd_notify(false,
1803                   "STATUS=Shutting down...");
1804
1805         server_done(&server);
1806
1807         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1808 }