chiark / gitweb /
52cdc7058c998a5fafd776685569b64622bbf10f
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "journal-internal.h"
45
46 #define USER_JOURNALS_MAX 1024
47 #define STDOUT_STREAMS_MAX 4096
48
49 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
50 #define DEFAULT_RATE_LIMIT_BURST 200
51
52 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
53
54 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
55
56 typedef struct StdoutStream StdoutStream;
57
58 typedef struct Server {
59         int epoll_fd;
60         int signal_fd;
61         int syslog_fd;
62         int native_fd;
63         int stdout_fd;
64
65         JournalFile *runtime_journal;
66         JournalFile *system_journal;
67         Hashmap *user_journals;
68
69         uint64_t seqnum;
70
71         char *buffer;
72         size_t buffer_size;
73
74         JournalRateLimit *rate_limit;
75
76         JournalMetrics metrics;
77         uint64_t max_use;
78         bool compress;
79
80         uint64_t cached_available_space;
81         usec_t cached_available_space_timestamp;
82
83         uint64_t var_available_timestamp;
84
85         LIST_HEAD(StdoutStream, stdout_streams);
86         unsigned n_stdout_streams;
87 } Server;
88
89 typedef enum StdoutStreamState {
90         STDOUT_STREAM_TAG,
91         STDOUT_STREAM_PRIORITY,
92         STDOUT_STREAM_PRIORITY_PREFIX,
93         STDOUT_STREAM_TEE_CONSOLE,
94         STDOUT_STREAM_RUNNING
95 } StdoutStreamState;
96
97 struct StdoutStream {
98         Server *server;
99         StdoutStreamState state;
100
101         int fd;
102
103         struct ucred ucred;
104
105         char *tag;
106         int priority;
107         bool priority_prefix:1;
108         bool tee_console:1;
109
110         char buffer[LINE_MAX+1];
111         size_t length;
112
113         LIST_FIELDS(StdoutStream, stdout_stream);
114 };
115
116 static int server_flush_to_var(Server *s);
117
118 static uint64_t available_space(Server *s) {
119         char ids[33];
120         sd_id128_t machine;
121         char *p;
122         const char *f;
123         struct statvfs ss;
124         uint64_t sum = 0, avail = 0, ss_avail = 0;
125         int r;
126         DIR *d;
127         usec_t ts = now(CLOCK_MONOTONIC);
128
129         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
130                 return s->cached_available_space;
131
132         r = sd_id128_get_machine(&machine);
133         if (r < 0)
134                 return 0;
135
136         if (s->system_journal)
137                 f = "/var/log/journal/";
138         else
139                 f = "/run/log/journal/";
140
141         p = strappend(f, sd_id128_to_string(machine, ids));
142         if (!p)
143                 return 0;
144
145         d = opendir(p);
146         free(p);
147
148         if (!d)
149                 return 0;
150
151         if (fstatvfs(dirfd(d), &ss) < 0)
152                 goto finish;
153
154         for (;;) {
155                 struct stat st;
156                 struct dirent buf, *de;
157                 int k;
158
159                 k = readdir_r(d, &buf, &de);
160                 if (k != 0) {
161                         r = -k;
162                         goto finish;
163                 }
164
165                 if (!de)
166                         break;
167
168                 if (!dirent_is_file_with_suffix(de, ".journal"))
169                         continue;
170
171                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
172                         continue;
173
174                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
175         }
176
177         avail = sum >= s->max_use ? 0 : s->max_use - sum;
178
179         ss_avail = ss.f_bsize * ss.f_bavail;
180
181         ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
182
183         if (ss_avail < avail)
184                 avail = ss_avail;
185
186         s->cached_available_space = avail;
187         s->cached_available_space_timestamp = ts;
188
189 finish:
190         closedir(d);
191
192         return avail;
193 }
194
195 static void fix_perms(JournalFile *f, uid_t uid) {
196         acl_t acl;
197         acl_entry_t entry;
198         acl_permset_t permset;
199         int r;
200
201         assert(f);
202
203         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
204         if (r < 0)
205                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
206
207         if (uid <= 0)
208                 return;
209
210         acl = acl_get_fd(f->fd);
211         if (!acl) {
212                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
213                 return;
214         }
215
216         r = acl_find_uid(acl, uid, &entry);
217         if (r <= 0) {
218
219                 if (acl_create_entry(&acl, &entry) < 0 ||
220                     acl_set_tag_type(entry, ACL_USER) < 0 ||
221                     acl_set_qualifier(entry, &uid) < 0) {
222                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
223                         goto finish;
224                 }
225         }
226
227         if (acl_get_permset(entry, &permset) < 0 ||
228             acl_add_perm(permset, ACL_READ) < 0 ||
229             acl_calc_mask(&acl) < 0) {
230                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
231                 goto finish;
232         }
233
234         if (acl_set_fd(f->fd, acl) < 0)
235                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
236
237 finish:
238         acl_free(acl);
239 }
240
241 static JournalFile* find_journal(Server *s, uid_t uid) {
242         char *p;
243         int r;
244         JournalFile *f;
245         char ids[33];
246         sd_id128_t machine;
247
248         assert(s);
249
250         /* We split up user logs only on /var, not on /run. If the
251          * runtime file is open, we write to it exclusively, in order
252          * to guarantee proper order as soon as we flush /run to
253          * /var and close the runtime file. */
254
255         if (s->runtime_journal)
256                 return s->runtime_journal;
257
258         if (uid <= 0)
259                 return s->system_journal;
260
261         r = sd_id128_get_machine(&machine);
262         if (r < 0)
263                 return s->system_journal;
264
265         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
266         if (f)
267                 return f;
268
269         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
270                 return s->system_journal;
271
272         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
273                 /* Too many open? Then let's close one */
274                 f = hashmap_steal_first(s->user_journals);
275                 assert(f);
276                 journal_file_close(f);
277         }
278
279         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
280         free(p);
281
282         if (r < 0)
283                 return s->system_journal;
284
285         fix_perms(f, uid);
286         f->metrics = s->metrics;
287         f->compress = s->compress;
288
289         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
290         if (r < 0) {
291                 journal_file_close(f);
292                 return s->system_journal;
293         }
294
295         return f;
296 }
297
298 static void server_vacuum(Server *s) {
299         Iterator i;
300         void *k;
301         char *p;
302         char ids[33];
303         sd_id128_t machine;
304         int r;
305         JournalFile *f;
306
307         log_info("Rotating...");
308
309         if (s->runtime_journal) {
310                 r = journal_file_rotate(&s->runtime_journal);
311                 if (r < 0)
312                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
313         }
314
315         if (s->system_journal) {
316                 r = journal_file_rotate(&s->system_journal);
317                 if (r < 0)
318                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
319         }
320
321         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
322                 r = journal_file_rotate(&f);
323                 if (r < 0)
324                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
325                 else
326                         hashmap_replace(s->user_journals, k, f);
327         }
328
329         log_info("Vacuuming...");
330
331         r = sd_id128_get_machine(&machine);
332         if (r < 0) {
333                 log_error("Failed to get machine ID: %s", strerror(-r));
334                 return;
335         }
336
337         if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
338                 log_error("Out of memory.");
339                 return;
340         }
341
342         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
343         if (r < 0 && r != -ENOENT)
344                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
345         free(p);
346
347         if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
348                 log_error("Out of memory.");
349                 return;
350         }
351
352         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
353         if (r < 0 && r != -ENOENT)
354                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
355         free(p);
356
357         s->cached_available_space_timestamp = 0;
358 }
359
360 static char *shortened_cgroup_path(pid_t pid) {
361         int r;
362         char *process_path, *init_path, *path;
363
364         assert(pid > 0);
365
366         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
367         if (r < 0)
368                 return NULL;
369
370         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
371         if (r < 0) {
372                 free(process_path);
373                 return NULL;
374         }
375
376         if (streq(init_path, "/"))
377                 init_path[0] = 0;
378
379         if (startswith(process_path, init_path))
380                 path = process_path + strlen(init_path);
381         else
382                 path = process_path;
383
384         free(init_path);
385
386         return path;
387 }
388
389 static void dispatch_message_real(Server *s,
390                              struct iovec *iovec, unsigned n, unsigned m,
391                              struct ucred *ucred,
392                              struct timeval *tv) {
393
394         char *pid = NULL, *uid = NULL, *gid = NULL,
395                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
396                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
397                 *audit_session = NULL, *audit_loginuid = NULL,
398                 *exe = NULL, *cgroup = NULL;
399
400         char idbuf[33];
401         sd_id128_t id;
402         int r;
403         char *t;
404         uid_t loginuid = 0, realuid = 0;
405         JournalFile *f;
406         bool vacuumed = false;
407
408         assert(s);
409         assert(iovec);
410         assert(n > 0);
411         assert(n + 13 <= m);
412
413         if (ucred) {
414                 uint32_t session;
415                 char *path;
416
417                 realuid = ucred->uid;
418
419                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
420                         IOVEC_SET_STRING(iovec[n++], pid);
421
422                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
423                         IOVEC_SET_STRING(iovec[n++], uid);
424
425                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
426                         IOVEC_SET_STRING(iovec[n++], gid);
427
428                 r = get_process_comm(ucred->pid, &t);
429                 if (r >= 0) {
430                         comm = strappend("_COMM=", t);
431                         if (comm)
432                                 IOVEC_SET_STRING(iovec[n++], comm);
433                         free(t);
434                 }
435
436                 r = get_process_exe(ucred->pid, &t);
437                 if (r >= 0) {
438                         exe = strappend("_EXE=", t);
439                         if (comm)
440                                 IOVEC_SET_STRING(iovec[n++], exe);
441                         free(t);
442                 }
443
444                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
445                 if (r >= 0) {
446                         cmdline = strappend("_CMDLINE=", t);
447                         if (cmdline)
448                                 IOVEC_SET_STRING(iovec[n++], cmdline);
449                         free(t);
450                 }
451
452                 r = audit_session_from_pid(ucred->pid, &session);
453                 if (r >= 0)
454                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
455                                 IOVEC_SET_STRING(iovec[n++], audit_session);
456
457                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
458                 if (r >= 0)
459                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
460                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
461
462                 path = shortened_cgroup_path(ucred->pid);
463                 if (path) {
464                         cgroup = strappend("_SYSTEMD_CGROUP=", path);
465                         if (cgroup)
466                                 IOVEC_SET_STRING(iovec[n++], cgroup);
467
468                         free(path);
469                 }
470         }
471
472         if (tv) {
473                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
474                              (unsigned long long) timeval_load(tv)) >= 0)
475                         IOVEC_SET_STRING(iovec[n++], source_time);
476         }
477
478         /* Note that strictly speaking storing the boot id here is
479          * redundant since the entry includes this in-line
480          * anyway. However, we need this indexed, too. */
481         r = sd_id128_get_boot(&id);
482         if (r >= 0)
483                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
484                         IOVEC_SET_STRING(iovec[n++], boot_id);
485
486         r = sd_id128_get_machine(&id);
487         if (r >= 0)
488                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
489                         IOVEC_SET_STRING(iovec[n++], machine_id);
490
491         t = gethostname_malloc();
492         if (t) {
493                 hostname = strappend("_HOSTNAME=", t);
494                 if (hostname)
495                         IOVEC_SET_STRING(iovec[n++], hostname);
496                 free(t);
497         }
498
499         assert(n <= m);
500
501         server_flush_to_var(s);
502
503 retry:
504         f = find_journal(s, realuid == 0 ? 0 : loginuid);
505         if (!f)
506                 log_warning("Dropping message, as we can't find a place to store the data.");
507         else {
508                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
509
510                 if (r == -E2BIG && !vacuumed) {
511                         log_info("Allocation limit reached.");
512
513                         server_vacuum(s);
514                         vacuumed = true;
515
516                         log_info("Retrying write.");
517                         goto retry;
518                 }
519
520                 if (r < 0)
521                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
522         }
523
524         free(pid);
525         free(uid);
526         free(gid);
527         free(comm);
528         free(exe);
529         free(cmdline);
530         free(source_time);
531         free(boot_id);
532         free(machine_id);
533         free(hostname);
534         free(audit_session);
535         free(audit_loginuid);
536         free(cgroup);
537 }
538
539 static void dispatch_message(Server *s,
540                              struct iovec *iovec, unsigned n, unsigned m,
541                              struct ucred *ucred,
542                              struct timeval *tv,
543                              int priority) {
544         int rl;
545         char *path, *c;
546
547         assert(s);
548         assert(iovec || n == 0);
549
550         if (n == 0)
551                 return;
552
553         if (!ucred)
554                 goto finish;
555
556         path = shortened_cgroup_path(ucred->pid);
557         if (!path)
558                 goto finish;
559
560         /* example: /user/lennart/3/foobar
561          *          /system/dbus.service/foobar
562          *
563          * So let's cut of everything past the third /, since that is
564          * wher user directories start */
565
566         c = strchr(path, '/');
567         if (c) {
568                 c = strchr(c+1, '/');
569                 if (c) {
570                         c = strchr(c+1, '/');
571                         if (c)
572                                 *c = 0;
573                 }
574         }
575
576         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
577
578         if (rl == 0) {
579                 free(path);
580                 return;
581         }
582
583         if (rl > 1) {
584                 int j = 0;
585                 char suppress_message[LINE_MAX];
586                 struct iovec suppress_iovec[15];
587
588                 /* Write a suppression message if we suppressed something */
589
590                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
591                 char_array_0(suppress_message);
592
593                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
594                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
595
596                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
597         }
598
599         free(path);
600
601 finish:
602         dispatch_message_real(s, iovec, n, m, ucred, tv);
603 }
604
605 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
606         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
607         struct iovec iovec[16];
608         unsigned n = 0;
609         int priority = LOG_USER | LOG_INFO;
610
611         assert(s);
612         assert(buf);
613
614         parse_syslog_priority((char**) &buf, &priority);
615         skip_syslog_date((char**) &buf);
616
617         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
618                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
619
620         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
621                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
622
623         message = strappend("MESSAGE=", buf);
624         if (message)
625                 IOVEC_SET_STRING(iovec[n++], message);
626
627         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
628
629         free(message);
630         free(syslog_facility);
631         free(syslog_priority);
632 }
633
634 static bool valid_user_field(const char *p, size_t l) {
635         const char *a;
636
637         /* We kinda enforce POSIX syntax recommendations for
638            environment variables here, but make a couple of additional
639            requirements.
640
641            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
642
643         /* No empty field names */
644         if (l <= 0)
645                 return false;
646
647         /* Don't allow names longer than 64 chars */
648         if (l > 64)
649                 return false;
650
651         /* Variables starting with an underscore are protected */
652         if (p[0] == '_')
653                 return false;
654
655         /* Don't allow digits as first character */
656         if (p[0] >= '0' && p[0] <= '9')
657                 return false;
658
659         /* Only allow A-Z0-9 and '_' */
660         for (a = p; a < p + l; a++)
661                 if (!((*a >= 'A' && *a <= 'Z') ||
662                       (*a >= '0' && *a <= '9') ||
663                       *a == '_'))
664                         return false;
665
666         return true;
667 }
668
669 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
670         struct iovec *iovec = NULL;
671         unsigned n = 0, m = 0, j;
672         const char *p;
673         size_t remaining;
674         int priority = LOG_INFO;
675
676         assert(s);
677         assert(buffer || n == 0);
678
679         p = buffer;
680         remaining = buffer_size;
681
682         while (remaining > 0) {
683                 const char *e, *q;
684
685                 e = memchr(p, '\n', remaining);
686
687                 if (!e) {
688                         /* Trailing noise, let's ignore it, and flush what we collected */
689                         log_debug("Received message with trailing noise, ignoring.");
690                         break;
691                 }
692
693                 if (e == p) {
694                         /* Entry separator */
695                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
696                         n = 0;
697                         priority = LOG_INFO;
698
699                         p++;
700                         remaining--;
701                         continue;
702                 }
703
704                 if (*p == '.' || *p == '#') {
705                         /* Ignore control commands for now, and
706                          * comments too. */
707                         remaining -= (e - p) + 1;
708                         p = e + 1;
709                         continue;
710                 }
711
712                 /* A property follows */
713
714                 if (n+13 >= m) {
715                         struct iovec *c;
716                         unsigned u;
717
718                         u = MAX((n+13U) * 2U, 4U);
719                         c = realloc(iovec, u * sizeof(struct iovec));
720                         if (!c) {
721                                 log_error("Out of memory");
722                                 break;
723                         }
724
725                         iovec = c;
726                         m = u;
727                 }
728
729                 q = memchr(p, '=', e - p);
730                 if (q) {
731                         if (valid_user_field(p, q - p)) {
732                                 /* If the field name starts with an
733                                  * underscore, skip the variable,
734                                  * since that indidates a trusted
735                                  * field */
736                                 iovec[n].iov_base = (char*) p;
737                                 iovec[n].iov_len = e - p;
738                                 n++;
739
740                                 /* We need to determine the priority
741                                  * of this entry for the rate limiting
742                                  * logic */
743                                 if (e - p == 10 &&
744                                     memcmp(p, "PRIORITY=", 10) == 0 &&
745                                     p[10] >= '0' &&
746                                     p[10] <= '9')
747                                         priority = p[10] - '0';
748                         }
749
750                         remaining -= (e - p) + 1;
751                         p = e + 1;
752                         continue;
753                 } else {
754                         uint64_t l;
755                         char *k;
756
757                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
758                                 log_debug("Failed to parse message, ignoring.");
759                                 break;
760                         }
761
762                         memcpy(&l, e + 1, sizeof(uint64_t));
763                         l = le64toh(l);
764
765                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
766                             e[1+sizeof(uint64_t)+l] != '\n') {
767                                 log_debug("Failed to parse message, ignoring.");
768                                 break;
769                         }
770
771                         k = malloc((e - p) + 1 + l);
772                         if (!k) {
773                                 log_error("Out of memory");
774                                 break;
775                         }
776
777                         memcpy(k, p, e - p);
778                         k[e - p] = '=';
779                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
780
781                         if (valid_user_field(p, e - p)) {
782                                 iovec[n].iov_base = k;
783                                 iovec[n].iov_len = (e - p) + 1 + l;
784                                 n++;
785                         } else
786                                 free(k);
787
788                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
789                         p = e + 1 + sizeof(uint64_t) + l + 1;
790                 }
791         }
792
793         dispatch_message(s, iovec, n, m, ucred, tv, priority);
794
795         for (j = 0; j < n; j++)
796                 if (iovec[j].iov_base < buffer ||
797                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
798                         free(iovec[j].iov_base);
799 }
800
801 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
802         struct iovec iovec[15];
803         char *message = NULL, *syslog_priority = NULL;
804         unsigned n = 0;
805         size_t tag_len;
806         int priority;
807
808         assert(s);
809         assert(p);
810
811         priority = s->priority;
812
813         if (s->priority_prefix &&
814             l > 3 &&
815             p[0] == '<' &&
816             p[1] >= '0' && p[1] <= '7' &&
817             p[2] == '>') {
818
819                 priority = p[1] - '0';
820                 p += 3;
821                 l -= 3;
822         }
823
824         if (l <= 0)
825                 return 0;
826
827         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
828                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
829
830         tag_len = s->tag ? strlen(s->tag) + 2: 0;
831         message = malloc(8 + tag_len + l);
832         if (message) {
833                 memcpy(message, "MESSAGE=", 8);
834
835                 if (s->tag) {
836                         memcpy(message+8, s->tag, tag_len-2);
837                         memcpy(message+8+tag_len-2, ": ", 2);
838                 }
839
840                 memcpy(message+8+tag_len, p, l);
841                 iovec[n].iov_base = message;
842                 iovec[n].iov_len = 8+tag_len+l;
843                 n++;
844         }
845
846         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
847
848         if (s->tee_console) {
849                 int console;
850
851                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
852                 if (console >= 0) {
853                         n = 0;
854                         if (s->tag) {
855                                 IOVEC_SET_STRING(iovec[n++], s->tag);
856                                 IOVEC_SET_STRING(iovec[n++], ": ");
857                         }
858
859                         iovec[n].iov_base = (void*) p;
860                         iovec[n].iov_len = l;
861                         n++;
862
863                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
864
865                         writev(console, iovec, n);
866                 }
867         }
868
869         free(message);
870         free(syslog_priority);
871
872         return 0;
873 }
874
875 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
876         assert(s);
877         assert(p);
878
879         while (l > 0 && strchr(WHITESPACE, *p)) {
880                 l--;
881                 p++;
882         }
883
884         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
885                 l--;
886
887         switch (s->state) {
888
889         case STDOUT_STREAM_TAG:
890
891                 if (l > 0) {
892                         s->tag = strndup(p, l);
893                         if (!s->tag) {
894                                 log_error("Out of memory");
895                                 return -EINVAL;
896                         }
897                 }
898
899                 s->state = STDOUT_STREAM_PRIORITY;
900                 return 0;
901
902         case STDOUT_STREAM_PRIORITY:
903                 if (l != 1 || *p < '0' || *p > '7') {
904                         log_warning("Failed to parse log priority line.");
905                         return -EINVAL;
906                 }
907
908                 s->priority = *p - '0';
909                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
910                 return 0;
911
912         case STDOUT_STREAM_PRIORITY_PREFIX:
913                 if (l != 1 || *p < '0' || *p > '1') {
914                         log_warning("Failed to parse priority prefix line.");
915                         return -EINVAL;
916                 }
917
918                 s->priority_prefix = *p - '0';
919                 s->state = STDOUT_STREAM_TEE_CONSOLE;
920                 return 0;
921
922         case STDOUT_STREAM_TEE_CONSOLE:
923                 if (l != 1 || *p < '0' || *p > '1') {
924                         log_warning("Failed to parse tee to console line.");
925                         return -EINVAL;
926                 }
927
928                 s->tee_console = *p - '0';
929                 s->state = STDOUT_STREAM_RUNNING;
930                 return 0;
931
932         case STDOUT_STREAM_RUNNING:
933                 return stdout_stream_log(s, p, l);
934         }
935
936         assert_not_reached("Unknown stream state");
937 }
938
939 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
940         char *p;
941         size_t remaining;
942         int r;
943
944         assert(s);
945
946         p = s->buffer;
947         remaining = s->length;
948         for (;;) {
949                 char *end;
950                 size_t skip;
951
952                 end = memchr(p, '\n', remaining);
953                 if (!end) {
954                         if (remaining >= LINE_MAX) {
955                                 end = p + LINE_MAX;
956                                 skip = LINE_MAX;
957                         } else
958                                 break;
959                 } else
960                         skip = end - p + 1;
961
962                 r = stdout_stream_line(s, p, end - p);
963                 if (r < 0)
964                         return r;
965
966                 remaining -= skip;
967                 p += skip;
968         }
969
970         if (force_flush && remaining > 0) {
971                 r = stdout_stream_line(s, p, remaining);
972                 if (r < 0)
973                         return r;
974
975                 p += remaining;
976                 remaining = 0;
977         }
978
979         if (p > s->buffer) {
980                 memmove(s->buffer, p, remaining);
981                 s->length = remaining;
982         }
983
984         return 0;
985 }
986
987 static int stdout_stream_process(StdoutStream *s) {
988         ssize_t l;
989         int r;
990
991         assert(s);
992
993         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
994         if (l < 0) {
995
996                 if (errno == EAGAIN)
997                         return 0;
998
999                 log_warning("Failed to read from stream: %m");
1000                 return -errno;
1001         }
1002
1003         if (l == 0) {
1004                 r = stdout_stream_scan(s, true);
1005                 if (r < 0)
1006                         return r;
1007
1008                 return 0;
1009         }
1010
1011         s->length += l;
1012         r = stdout_stream_scan(s, false);
1013         if (r < 0)
1014                 return r;
1015
1016         return 1;
1017
1018 }
1019
1020 static void stdout_stream_free(StdoutStream *s) {
1021         assert(s);
1022
1023         if (s->server) {
1024                 assert(s->server->n_stdout_streams > 0);
1025                 s->server->n_stdout_streams --;
1026                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1027         }
1028
1029         if (s->fd >= 0) {
1030                 if (s->server)
1031                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1032
1033                 close_nointr_nofail(s->fd);
1034         }
1035
1036         free(s->tag);
1037         free(s);
1038 }
1039
1040 static int stdout_stream_new(Server *s) {
1041         StdoutStream *stream;
1042         int fd, r;
1043         socklen_t len;
1044         struct epoll_event ev;
1045
1046         assert(s);
1047
1048         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1049         if (fd < 0) {
1050                 if (errno == EAGAIN)
1051                         return 0;
1052
1053                 log_error("Failed to accept stdout connection: %m");
1054                 return -errno;
1055         }
1056
1057         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1058                 log_warning("Too many stdout streams, refusing connection.");
1059                 close_nointr_nofail(fd);
1060                 return 0;
1061         }
1062
1063         stream = new0(StdoutStream, 1);
1064         if (!stream) {
1065                 log_error("Out of memory.");
1066                 close_nointr_nofail(fd);
1067                 return -ENOMEM;
1068         }
1069
1070         stream->fd = fd;
1071
1072         len = sizeof(stream->ucred);
1073         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1074                 log_error("Failed to determine peer credentials: %m");
1075                 r = -errno;
1076                 goto fail;
1077         }
1078
1079         if (shutdown(fd, SHUT_WR) < 0) {
1080                 log_error("Failed to shutdown writing side of socket: %m");
1081                 r = -errno;
1082                 goto fail;
1083         }
1084
1085         zero(ev);
1086         ev.data.ptr = stream;
1087         ev.events = EPOLLIN;
1088         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1089                 log_error("Failed to add stream to event loop: %m");
1090                 r = -errno;
1091                 goto fail;
1092         }
1093
1094         stream->server = s;
1095         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1096         s->n_stdout_streams ++;
1097
1098         return 0;
1099
1100 fail:
1101         stdout_stream_free(stream);
1102         return r;
1103 }
1104
1105 static int system_journal_open(Server *s) {
1106         int r;
1107         char *fn;
1108         sd_id128_t machine;
1109         char ids[33];
1110
1111         r = sd_id128_get_machine(&machine);
1112         if (r < 0)
1113                 return r;
1114
1115         sd_id128_to_string(machine, ids);
1116
1117         if (!s->system_journal) {
1118
1119                 /* First try to create the machine path, but not the prefix */
1120                 fn = strappend("/var/log/journal/", ids);
1121                 if (!fn)
1122                         return -ENOMEM;
1123                 (void) mkdir(fn, 0755);
1124                 free(fn);
1125
1126                 /* The create the system journal file */
1127                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1128                 if (!fn)
1129                         return -ENOMEM;
1130
1131                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1132                 free(fn);
1133
1134                 if (r >= 0) {
1135                         s->system_journal->metrics = s->metrics;
1136                         s->system_journal->compress = s->compress;
1137
1138                         fix_perms(s->system_journal, 0);
1139                 } else if (r < 0) {
1140
1141                         if (r == -ENOENT)
1142                                 r = 0;
1143                         else {
1144                                 log_error("Failed to open system journal: %s", strerror(-r));
1145                                 return r;
1146                         }
1147                 }
1148         }
1149
1150         if (!s->runtime_journal) {
1151
1152                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1153                 if (!fn)
1154                         return -ENOMEM;
1155
1156                 if (s->system_journal) {
1157
1158                         /* Try to open the runtime journal, but only
1159                          * if it already exists, so that we can flush
1160                          * it into the system journal */
1161
1162                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1163                         free(fn);
1164
1165                         if (r < 0) {
1166
1167                                 if (r == -ENOENT)
1168                                         r = 0;
1169                                 else {
1170                                         log_error("Failed to open runtime journal: %s", strerror(-r));
1171                                         return r;
1172                                 }
1173                         }
1174
1175                 } else {
1176
1177                         /* OK, we really need the runtime journal, so create
1178                          * it if necessary. */
1179
1180                         (void) mkdir_parents(fn, 0755);
1181                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1182                         free(fn);
1183
1184                         if (r < 0) {
1185                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1186                                 return r;
1187                         }
1188                 }
1189
1190                 if (s->runtime_journal) {
1191                         s->runtime_journal->metrics = s->metrics;
1192                         s->runtime_journal->compress = s->compress;
1193
1194                         fix_perms(s->runtime_journal, 0);
1195                 }
1196         }
1197
1198         return r;
1199 }
1200
1201 static int server_flush_to_var(Server *s) {
1202         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1203         Object *o = NULL;
1204         int r;
1205         sd_id128_t machine;
1206         sd_journal *j;
1207         usec_t ts;
1208
1209         assert(s);
1210
1211         if (!s->runtime_journal)
1212                 return 0;
1213
1214         ts = now(CLOCK_MONOTONIC);
1215         if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1216                 return 0;
1217
1218         s->var_available_timestamp = ts;
1219
1220         system_journal_open(s);
1221
1222         if (!s->system_journal)
1223                 return 0;
1224
1225         r = sd_id128_get_machine(&machine);
1226         if (r < 0) {
1227                 log_error("Failed to get machine id: %s", strerror(-r));
1228                 return r;
1229         }
1230
1231         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1232         if (r < 0) {
1233                 log_error("Failed to read runtime journal: %s", strerror(-r));
1234                 return r;
1235         }
1236
1237         SD_JOURNAL_FOREACH(j) {
1238                 JournalFile *f;
1239
1240                 f = j->current_file;
1241                 assert(f && f->current_offset > 0);
1242
1243                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1244                 if (r < 0) {
1245                         log_error("Can't read entry: %s", strerror(-r));
1246                         goto finish;
1247                 }
1248
1249                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1250                 if (r == -E2BIG) {
1251                         log_info("Allocation limit reached.");
1252
1253                         journal_file_post_change(s->system_journal);
1254                         server_vacuum(s);
1255
1256                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1257                 }
1258
1259                 if (r < 0) {
1260                         log_error("Can't write entry: %s", strerror(-r));
1261                         goto finish;
1262                 }
1263         }
1264
1265 finish:
1266         journal_file_post_change(s->system_journal);
1267
1268         journal_file_close(s->runtime_journal);
1269         s->runtime_journal = NULL;
1270
1271         if (r >= 0) {
1272                 sd_id128_to_string(machine, path + 17);
1273                 rm_rf(path, false, true, false);
1274         }
1275
1276         return r;
1277 }
1278
1279 static int process_event(Server *s, struct epoll_event *ev) {
1280         assert(s);
1281
1282         if (ev->data.fd == s->signal_fd) {
1283                 struct signalfd_siginfo sfsi;
1284                 ssize_t n;
1285
1286                 if (ev->events != EPOLLIN) {
1287                         log_info("Got invalid event from epoll.");
1288                         return -EIO;
1289                 }
1290
1291                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1292                 if (n != sizeof(sfsi)) {
1293
1294                         if (n >= 0)
1295                                 return -EIO;
1296
1297                         if (errno == EINTR || errno == EAGAIN)
1298                                 return 0;
1299
1300                         return -errno;
1301                 }
1302
1303                 if (sfsi.ssi_signo == SIGUSR1) {
1304                         server_flush_to_var(s);
1305                         return 0;
1306                 }
1307
1308                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1309                 return 0;
1310
1311         } else if (ev->data.fd == s->native_fd ||
1312                    ev->data.fd == s->syslog_fd) {
1313
1314                 if (ev->events != EPOLLIN) {
1315                         log_info("Got invalid event from epoll.");
1316                         return -EIO;
1317                 }
1318
1319                 for (;;) {
1320                         struct msghdr msghdr;
1321                         struct iovec iovec;
1322                         struct ucred *ucred = NULL;
1323                         struct timeval *tv = NULL;
1324                         struct cmsghdr *cmsg;
1325                         union {
1326                                 struct cmsghdr cmsghdr;
1327                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1328                                             CMSG_SPACE(sizeof(struct timeval))];
1329                         } control;
1330                         ssize_t n;
1331                         int v;
1332
1333                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1334                                 log_error("SIOCINQ failed: %m");
1335                                 return -errno;
1336                         }
1337
1338                         if (v <= 0)
1339                                 return 1;
1340
1341                         if (s->buffer_size < (size_t) v) {
1342                                 void *b;
1343                                 size_t l;
1344
1345                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1346                                 b = realloc(s->buffer, l+1);
1347
1348                                 if (!b) {
1349                                         log_error("Couldn't increase buffer.");
1350                                         return -ENOMEM;
1351                                 }
1352
1353                                 s->buffer_size = l;
1354                                 s->buffer = b;
1355                         }
1356
1357                         zero(iovec);
1358                         iovec.iov_base = s->buffer;
1359                         iovec.iov_len = s->buffer_size;
1360
1361                         zero(control);
1362                         zero(msghdr);
1363                         msghdr.msg_iov = &iovec;
1364                         msghdr.msg_iovlen = 1;
1365                         msghdr.msg_control = &control;
1366                         msghdr.msg_controllen = sizeof(control);
1367
1368                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1369                         if (n < 0) {
1370
1371                                 if (errno == EINTR || errno == EAGAIN)
1372                                         return 1;
1373
1374                                 log_error("recvmsg() failed: %m");
1375                                 return -errno;
1376                         }
1377
1378                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1379
1380                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1381                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1382                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1383                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1384                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1385                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1386                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1387                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1388                         }
1389
1390                         if (ev->data.fd == s->syslog_fd) {
1391                                 char *e;
1392
1393                                 e = memchr(s->buffer, '\n', n);
1394                                 if (e)
1395                                         *e = 0;
1396                                 else
1397                                         s->buffer[n] = 0;
1398
1399                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1400                         } else
1401                                 process_native_message(s, s->buffer, n, ucred, tv);
1402                 }
1403
1404                 return 1;
1405
1406         } else if (ev->data.fd == s->stdout_fd) {
1407
1408                 if (ev->events != EPOLLIN) {
1409                         log_info("Got invalid event from epoll.");
1410                         return -EIO;
1411                 }
1412
1413                 stdout_stream_new(s);
1414                 return 1;
1415
1416         } else {
1417                 StdoutStream *stream;
1418
1419                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1420                         log_info("Got invalid event from epoll.");
1421                         return -EIO;
1422                 }
1423
1424                 /* If it is none of the well-known fds, it must be an
1425                  * stdout stream fd. Note that this is a bit ugly here
1426                  * (since we rely that none of the well-known fds
1427                  * could be interpreted as pointer), but nonetheless
1428                  * safe, since the well-known fds would never get an
1429                  * fd > 4096, i.e. beyond the first memory page */
1430
1431                 stream = ev->data.ptr;
1432
1433                 if (stdout_stream_process(stream) <= 0)
1434                         stdout_stream_free(stream);
1435
1436                 return 1;
1437         }
1438
1439         log_error("Unknown event.");
1440         return 0;
1441 }
1442
1443 static int open_syslog_socket(Server *s) {
1444         union sockaddr_union sa;
1445         int one, r;
1446         struct epoll_event ev;
1447
1448         assert(s);
1449
1450         if (s->syslog_fd < 0) {
1451
1452                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1453                 if (s->syslog_fd < 0) {
1454                         log_error("socket() failed: %m");
1455                         return -errno;
1456                 }
1457
1458                 zero(sa);
1459                 sa.un.sun_family = AF_UNIX;
1460                 strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1461
1462                 unlink(sa.un.sun_path);
1463
1464                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1465                 if (r < 0) {
1466                         log_error("bind() failed: %m");
1467                         return -errno;
1468                 }
1469
1470                 chmod(sa.un.sun_path, 0666);
1471         }
1472
1473         one = 1;
1474         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1475         if (r < 0) {
1476                 log_error("SO_PASSCRED failed: %m");
1477                 return -errno;
1478         }
1479
1480         one = 1;
1481         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1482         if (r < 0) {
1483                 log_error("SO_TIMESTAMP failed: %m");
1484                 return -errno;
1485         }
1486
1487         zero(ev);
1488         ev.events = EPOLLIN;
1489         ev.data.fd = s->syslog_fd;
1490         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1491                 log_error("Failed to add syslog server fd to epoll object: %m");
1492                 return -errno;
1493         }
1494
1495         return 0;
1496 }
1497
1498 static int open_native_socket(Server*s) {
1499         union sockaddr_union sa;
1500         int one, r;
1501         struct epoll_event ev;
1502
1503         assert(s);
1504
1505         if (s->native_fd < 0) {
1506
1507                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1508                 if (s->native_fd < 0) {
1509                         log_error("socket() failed: %m");
1510                         return -errno;
1511                 }
1512
1513                 zero(sa);
1514                 sa.un.sun_family = AF_UNIX;
1515                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1516
1517                 unlink(sa.un.sun_path);
1518
1519                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1520                 if (r < 0) {
1521                         log_error("bind() failed: %m");
1522                         return -errno;
1523                 }
1524
1525                 chmod(sa.un.sun_path, 0666);
1526         }
1527
1528         one = 1;
1529         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1530         if (r < 0) {
1531                 log_error("SO_PASSCRED failed: %m");
1532                 return -errno;
1533         }
1534
1535         one = 1;
1536         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1537         if (r < 0) {
1538                 log_error("SO_TIMESTAMP failed: %m");
1539                 return -errno;
1540         }
1541
1542         zero(ev);
1543         ev.events = EPOLLIN;
1544         ev.data.fd = s->native_fd;
1545         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1546                 log_error("Failed to add native server fd to epoll object: %m");
1547                 return -errno;
1548         }
1549
1550         return 0;
1551 }
1552
1553 static int open_stdout_socket(Server *s) {
1554         union sockaddr_union sa;
1555         int r;
1556         struct epoll_event ev;
1557
1558         assert(s);
1559
1560         if (s->stdout_fd < 0) {
1561
1562                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1563                 if (s->stdout_fd < 0) {
1564                         log_error("socket() failed: %m");
1565                         return -errno;
1566                 }
1567
1568                 zero(sa);
1569                 sa.un.sun_family = AF_UNIX;
1570                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1571
1572                 unlink(sa.un.sun_path);
1573
1574                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1575                 if (r < 0) {
1576                         log_error("bind() failed: %m");
1577                         return -errno;
1578                 }
1579
1580                 chmod(sa.un.sun_path, 0666);
1581
1582                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1583                         log_error("liste() failed: %m");
1584                         return -errno;
1585                 }
1586         }
1587
1588         zero(ev);
1589         ev.events = EPOLLIN;
1590         ev.data.fd = s->stdout_fd;
1591         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1592                 log_error("Failed to add stdout server fd to epoll object: %m");
1593                 return -errno;
1594         }
1595
1596         return 0;
1597 }
1598
1599 static int open_signalfd(Server *s) {
1600         sigset_t mask;
1601         struct epoll_event ev;
1602
1603         assert(s);
1604
1605         assert_se(sigemptyset(&mask) == 0);
1606         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1607         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1608
1609         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1610         if (s->signal_fd < 0) {
1611                 log_error("signalfd(): %m");
1612                 return -errno;
1613         }
1614
1615         zero(ev);
1616         ev.events = EPOLLIN;
1617         ev.data.fd = s->signal_fd;
1618
1619         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1620                 log_error("epoll_ctl(): %m");
1621                 return -errno;
1622         }
1623
1624         return 0;
1625 }
1626
1627 static int server_init(Server *s) {
1628         int n, r, fd;
1629
1630         assert(s);
1631
1632         zero(*s);
1633         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1634         s->metrics.max_size = DEFAULT_MAX_SIZE;
1635         s->metrics.min_size = DEFAULT_MIN_SIZE;
1636         s->metrics.keep_free = DEFAULT_KEEP_FREE;
1637         s->max_use = DEFAULT_MAX_USE;
1638         s->compress = true;
1639
1640         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1641         if (!s->user_journals) {
1642                 log_error("Out of memory.");
1643                 return -ENOMEM;
1644         }
1645
1646         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1647         if (s->epoll_fd < 0) {
1648                 log_error("Failed to create epoll object: %m");
1649                 return -errno;
1650         }
1651
1652         n = sd_listen_fds(true);
1653         if (n < 0) {
1654                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1655                 return n;
1656         }
1657
1658         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1659
1660                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1661
1662                         if (s->native_fd >= 0) {
1663                                 log_error("Too many native sockets passed.");
1664                                 return -EINVAL;
1665                         }
1666
1667                         s->native_fd = fd;
1668
1669                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1670
1671                         if (s->stdout_fd >= 0) {
1672                                 log_error("Too many stdout sockets passed.");
1673                                 return -EINVAL;
1674                         }
1675
1676                         s->stdout_fd = fd;
1677
1678                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1679
1680                         if (s->syslog_fd >= 0) {
1681                                 log_error("Too many /dev/log sockets passed.");
1682                                 return -EINVAL;
1683                         }
1684
1685                         s->syslog_fd = fd;
1686
1687                 } else {
1688                         log_error("Unknown socket passed.");
1689                         return -EINVAL;
1690                 }
1691         }
1692
1693         r = open_syslog_socket(s);
1694         if (r < 0)
1695                 return r;
1696
1697         r = open_native_socket(s);
1698         if (r < 0)
1699                 return r;
1700
1701         r = open_stdout_socket(s);
1702         if (r < 0)
1703                 return r;
1704
1705         r = system_journal_open(s);
1706         if (r < 0)
1707                 return r;
1708
1709         r = open_signalfd(s);
1710         if (r < 0)
1711                 return r;
1712
1713         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1714         if (!s->rate_limit)
1715                 return -ENOMEM;
1716
1717         return 0;
1718 }
1719
1720 static void server_done(Server *s) {
1721         JournalFile *f;
1722         assert(s);
1723
1724         while (s->stdout_streams)
1725                 stdout_stream_free(s->stdout_streams);
1726
1727         if (s->system_journal)
1728                 journal_file_close(s->system_journal);
1729
1730         if (s->runtime_journal)
1731                 journal_file_close(s->runtime_journal);
1732
1733         while ((f = hashmap_steal_first(s->user_journals)))
1734                 journal_file_close(f);
1735
1736         hashmap_free(s->user_journals);
1737
1738         if (s->epoll_fd >= 0)
1739                 close_nointr_nofail(s->epoll_fd);
1740
1741         if (s->signal_fd >= 0)
1742                 close_nointr_nofail(s->signal_fd);
1743
1744         if (s->syslog_fd >= 0)
1745                 close_nointr_nofail(s->syslog_fd);
1746
1747         if (s->native_fd >= 0)
1748                 close_nointr_nofail(s->native_fd);
1749
1750         if (s->stdout_fd >= 0)
1751                 close_nointr_nofail(s->stdout_fd);
1752
1753         if (s->rate_limit)
1754                 journal_rate_limit_free(s->rate_limit);
1755 }
1756
1757 int main(int argc, char *argv[]) {
1758         Server server;
1759         int r;
1760
1761         /* if (getppid() != 1) { */
1762         /*         log_error("This program should be invoked by init only."); */
1763         /*         return EXIT_FAILURE; */
1764         /* } */
1765
1766         if (argc > 1) {
1767                 log_error("This program does not take arguments.");
1768                 return EXIT_FAILURE;
1769         }
1770
1771         log_set_target(LOG_TARGET_CONSOLE);
1772         log_set_max_level(LOG_DEBUG);
1773         log_parse_environment();
1774         log_open();
1775
1776         umask(0022);
1777
1778         r = server_init(&server);
1779         if (r < 0)
1780                 goto finish;
1781
1782         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1783
1784         sd_notify(false,
1785                   "READY=1\n"
1786                   "STATUS=Processing requests...");
1787
1788         server_vacuum(&server);
1789         server_flush_to_var(&server);
1790
1791         for (;;) {
1792                 struct epoll_event event;
1793
1794                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1795                 if (r < 0) {
1796
1797                         if (errno == EINTR)
1798                                 continue;
1799
1800                         log_error("epoll_wait() failed: %m");
1801                         r = -errno;
1802                         goto finish;
1803                 } else if (r == 0)
1804                         break;
1805
1806                 r = process_event(&server, &event);
1807                 if (r < 0)
1808                         goto finish;
1809                 else if (r == 0)
1810                         break;
1811         }
1812
1813         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1814
1815 finish:
1816         sd_notify(false,
1817                   "STATUS=Shutting down...");
1818
1819         server_done(&server);
1820
1821         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1822 }