chiark / gitweb /
journal: disable default debug logging
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "journal-internal.h"
45
46 #define USER_JOURNALS_MAX 1024
47 #define STDOUT_STREAMS_MAX 4096
48
49 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
50 #define DEFAULT_RATE_LIMIT_BURST 200
51
52 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
53
54 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
55
56 #define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
57
58 typedef struct StdoutStream StdoutStream;
59
60 typedef struct Server {
61         int epoll_fd;
62         int signal_fd;
63         int syslog_fd;
64         int native_fd;
65         int stdout_fd;
66
67         JournalFile *runtime_journal;
68         JournalFile *system_journal;
69         Hashmap *user_journals;
70
71         uint64_t seqnum;
72
73         char *buffer;
74         size_t buffer_size;
75
76         JournalRateLimit *rate_limit;
77
78         JournalMetrics metrics;
79         uint64_t max_use;
80         bool compress;
81
82         uint64_t cached_available_space;
83         usec_t cached_available_space_timestamp;
84
85         uint64_t var_available_timestamp;
86
87         LIST_HEAD(StdoutStream, stdout_streams);
88         unsigned n_stdout_streams;
89 } Server;
90
91 typedef enum StdoutStreamState {
92         STDOUT_STREAM_TAG,
93         STDOUT_STREAM_PRIORITY,
94         STDOUT_STREAM_PRIORITY_PREFIX,
95         STDOUT_STREAM_TEE_CONSOLE,
96         STDOUT_STREAM_RUNNING
97 } StdoutStreamState;
98
99 struct StdoutStream {
100         Server *server;
101         StdoutStreamState state;
102
103         int fd;
104
105         struct ucred ucred;
106
107         char *tag;
108         int priority;
109         bool priority_prefix:1;
110         bool tee_console:1;
111
112         char buffer[LINE_MAX+1];
113         size_t length;
114
115         LIST_FIELDS(StdoutStream, stdout_stream);
116 };
117
118 static int server_flush_to_var(Server *s);
119
120 static uint64_t available_space(Server *s) {
121         char ids[33];
122         sd_id128_t machine;
123         char *p;
124         const char *f;
125         struct statvfs ss;
126         uint64_t sum = 0, avail = 0, ss_avail = 0;
127         int r;
128         DIR *d;
129         usec_t ts = now(CLOCK_MONOTONIC);
130
131         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
132                 return s->cached_available_space;
133
134         r = sd_id128_get_machine(&machine);
135         if (r < 0)
136                 return 0;
137
138         if (s->system_journal)
139                 f = "/var/log/journal/";
140         else
141                 f = "/run/log/journal/";
142
143         p = strappend(f, sd_id128_to_string(machine, ids));
144         if (!p)
145                 return 0;
146
147         d = opendir(p);
148         free(p);
149
150         if (!d)
151                 return 0;
152
153         if (fstatvfs(dirfd(d), &ss) < 0)
154                 goto finish;
155
156         for (;;) {
157                 struct stat st;
158                 struct dirent buf, *de;
159                 int k;
160
161                 k = readdir_r(d, &buf, &de);
162                 if (k != 0) {
163                         r = -k;
164                         goto finish;
165                 }
166
167                 if (!de)
168                         break;
169
170                 if (!dirent_is_file_with_suffix(de, ".journal"))
171                         continue;
172
173                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
174                         continue;
175
176                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
177         }
178
179         avail = sum >= s->max_use ? 0 : s->max_use - sum;
180
181         ss_avail = ss.f_bsize * ss.f_bavail;
182
183         ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
184
185         if (ss_avail < avail)
186                 avail = ss_avail;
187
188         s->cached_available_space = avail;
189         s->cached_available_space_timestamp = ts;
190
191 finish:
192         closedir(d);
193
194         return avail;
195 }
196
197 static void fix_perms(JournalFile *f, uid_t uid) {
198         acl_t acl;
199         acl_entry_t entry;
200         acl_permset_t permset;
201         int r;
202
203         assert(f);
204
205         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
206         if (r < 0)
207                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
208
209         if (uid <= 0)
210                 return;
211
212         acl = acl_get_fd(f->fd);
213         if (!acl) {
214                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
215                 return;
216         }
217
218         r = acl_find_uid(acl, uid, &entry);
219         if (r <= 0) {
220
221                 if (acl_create_entry(&acl, &entry) < 0 ||
222                     acl_set_tag_type(entry, ACL_USER) < 0 ||
223                     acl_set_qualifier(entry, &uid) < 0) {
224                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
225                         goto finish;
226                 }
227         }
228
229         if (acl_get_permset(entry, &permset) < 0 ||
230             acl_add_perm(permset, ACL_READ) < 0 ||
231             acl_calc_mask(&acl) < 0) {
232                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
233                 goto finish;
234         }
235
236         if (acl_set_fd(f->fd, acl) < 0)
237                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
238
239 finish:
240         acl_free(acl);
241 }
242
243 static JournalFile* find_journal(Server *s, uid_t uid) {
244         char *p;
245         int r;
246         JournalFile *f;
247         char ids[33];
248         sd_id128_t machine;
249
250         assert(s);
251
252         /* We split up user logs only on /var, not on /run. If the
253          * runtime file is open, we write to it exclusively, in order
254          * to guarantee proper order as soon as we flush /run to
255          * /var and close the runtime file. */
256
257         if (s->runtime_journal)
258                 return s->runtime_journal;
259
260         if (uid <= 0)
261                 return s->system_journal;
262
263         r = sd_id128_get_machine(&machine);
264         if (r < 0)
265                 return s->system_journal;
266
267         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
268         if (f)
269                 return f;
270
271         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
272                 return s->system_journal;
273
274         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
275                 /* Too many open? Then let's close one */
276                 f = hashmap_steal_first(s->user_journals);
277                 assert(f);
278                 journal_file_close(f);
279         }
280
281         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
282         free(p);
283
284         if (r < 0)
285                 return s->system_journal;
286
287         fix_perms(f, uid);
288         f->metrics = s->metrics;
289         f->compress = s->compress;
290
291         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
292         if (r < 0) {
293                 journal_file_close(f);
294                 return s->system_journal;
295         }
296
297         return f;
298 }
299
300 static void server_vacuum(Server *s) {
301         Iterator i;
302         void *k;
303         char *p;
304         char ids[33];
305         sd_id128_t machine;
306         int r;
307         JournalFile *f;
308
309         log_info("Rotating...");
310
311         if (s->runtime_journal) {
312                 r = journal_file_rotate(&s->runtime_journal);
313                 if (r < 0)
314                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
315         }
316
317         if (s->system_journal) {
318                 r = journal_file_rotate(&s->system_journal);
319                 if (r < 0)
320                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
321         }
322
323         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
324                 r = journal_file_rotate(&f);
325                 if (r < 0)
326                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
327                 else
328                         hashmap_replace(s->user_journals, k, f);
329         }
330
331         log_info("Vacuuming...");
332
333         r = sd_id128_get_machine(&machine);
334         if (r < 0) {
335                 log_error("Failed to get machine ID: %s", strerror(-r));
336                 return;
337         }
338
339         if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
340                 log_error("Out of memory.");
341                 return;
342         }
343
344         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
345         if (r < 0 && r != -ENOENT)
346                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
347         free(p);
348
349         if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
350                 log_error("Out of memory.");
351                 return;
352         }
353
354         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
355         if (r < 0 && r != -ENOENT)
356                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
357         free(p);
358
359         s->cached_available_space_timestamp = 0;
360 }
361
362 static char *shortened_cgroup_path(pid_t pid) {
363         int r;
364         char *process_path, *init_path, *path;
365
366         assert(pid > 0);
367
368         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
369         if (r < 0)
370                 return NULL;
371
372         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
373         if (r < 0) {
374                 free(process_path);
375                 return NULL;
376         }
377
378         if (streq(init_path, "/"))
379                 init_path[0] = 0;
380
381         if (startswith(process_path, init_path)) {
382                 char *p;
383
384                 p = strdup(process_path + strlen(init_path));
385                 if (!p) {
386                         free(process_path);
387                         free(init_path);
388                         return NULL;
389                 }
390                 path = p;
391         } else {
392                 path = process_path;
393                 process_path = NULL;
394         }
395
396         free(process_path);
397         free(init_path);
398
399         return path;
400 }
401
402 static void dispatch_message_real(Server *s,
403                              struct iovec *iovec, unsigned n, unsigned m,
404                              struct ucred *ucred,
405                              struct timeval *tv) {
406
407         char *pid = NULL, *uid = NULL, *gid = NULL,
408                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
409                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
410                 *audit_session = NULL, *audit_loginuid = NULL,
411                 *exe = NULL, *cgroup = NULL;
412
413         char idbuf[33];
414         sd_id128_t id;
415         int r;
416         char *t;
417         uid_t loginuid = 0, realuid = 0;
418         JournalFile *f;
419         bool vacuumed = false;
420
421         assert(s);
422         assert(iovec);
423         assert(n > 0);
424         assert(n + 13 <= m);
425
426         if (ucred) {
427                 uint32_t session;
428                 char *path;
429
430                 realuid = ucred->uid;
431
432                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
433                         IOVEC_SET_STRING(iovec[n++], pid);
434
435                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
436                         IOVEC_SET_STRING(iovec[n++], uid);
437
438                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
439                         IOVEC_SET_STRING(iovec[n++], gid);
440
441                 r = get_process_comm(ucred->pid, &t);
442                 if (r >= 0) {
443                         comm = strappend("_COMM=", t);
444                         if (comm)
445                                 IOVEC_SET_STRING(iovec[n++], comm);
446                         free(t);
447                 }
448
449                 r = get_process_exe(ucred->pid, &t);
450                 if (r >= 0) {
451                         exe = strappend("_EXE=", t);
452                         if (comm)
453                                 IOVEC_SET_STRING(iovec[n++], exe);
454                         free(t);
455                 }
456
457                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
458                 if (r >= 0) {
459                         cmdline = strappend("_CMDLINE=", t);
460                         if (cmdline)
461                                 IOVEC_SET_STRING(iovec[n++], cmdline);
462                         free(t);
463                 }
464
465                 r = audit_session_from_pid(ucred->pid, &session);
466                 if (r >= 0)
467                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
468                                 IOVEC_SET_STRING(iovec[n++], audit_session);
469
470                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
471                 if (r >= 0)
472                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
473                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
474
475                 path = shortened_cgroup_path(ucred->pid);
476                 if (path) {
477                         cgroup = strappend("_SYSTEMD_CGROUP=", path);
478                         if (cgroup)
479                                 IOVEC_SET_STRING(iovec[n++], cgroup);
480
481                         free(path);
482                 }
483         }
484
485         if (tv) {
486                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
487                              (unsigned long long) timeval_load(tv)) >= 0)
488                         IOVEC_SET_STRING(iovec[n++], source_time);
489         }
490
491         /* Note that strictly speaking storing the boot id here is
492          * redundant since the entry includes this in-line
493          * anyway. However, we need this indexed, too. */
494         r = sd_id128_get_boot(&id);
495         if (r >= 0)
496                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
497                         IOVEC_SET_STRING(iovec[n++], boot_id);
498
499         r = sd_id128_get_machine(&id);
500         if (r >= 0)
501                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
502                         IOVEC_SET_STRING(iovec[n++], machine_id);
503
504         t = gethostname_malloc();
505         if (t) {
506                 hostname = strappend("_HOSTNAME=", t);
507                 if (hostname)
508                         IOVEC_SET_STRING(iovec[n++], hostname);
509                 free(t);
510         }
511
512         assert(n <= m);
513
514         server_flush_to_var(s);
515
516 retry:
517         f = find_journal(s, realuid == 0 ? 0 : loginuid);
518         if (!f)
519                 log_warning("Dropping message, as we can't find a place to store the data.");
520         else {
521                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
522
523                 if (r == -E2BIG && !vacuumed) {
524                         log_info("Allocation limit reached.");
525
526                         server_vacuum(s);
527                         vacuumed = true;
528
529                         log_info("Retrying write.");
530                         goto retry;
531                 }
532
533                 if (r < 0)
534                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
535         }
536
537         free(pid);
538         free(uid);
539         free(gid);
540         free(comm);
541         free(exe);
542         free(cmdline);
543         free(source_time);
544         free(boot_id);
545         free(machine_id);
546         free(hostname);
547         free(audit_session);
548         free(audit_loginuid);
549         free(cgroup);
550 }
551
552 static void dispatch_message(Server *s,
553                              struct iovec *iovec, unsigned n, unsigned m,
554                              struct ucred *ucred,
555                              struct timeval *tv,
556                              int priority) {
557         int rl;
558         char *path = NULL, *c;
559
560         assert(s);
561         assert(iovec || n == 0);
562
563         if (n == 0)
564                 return;
565
566         if (!ucred)
567                 goto finish;
568
569         path = shortened_cgroup_path(ucred->pid);
570         if (!path)
571                 goto finish;
572
573         /* example: /user/lennart/3/foobar
574          *          /system/dbus.service/foobar
575          *
576          * So let's cut of everything past the third /, since that is
577          * wher user directories start */
578
579         c = strchr(path, '/');
580         if (c) {
581                 c = strchr(c+1, '/');
582                 if (c) {
583                         c = strchr(c+1, '/');
584                         if (c)
585                                 *c = 0;
586                 }
587         }
588
589         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
590
591         if (rl == 0) {
592                 free(path);
593                 return;
594         }
595
596         if (rl > 1) {
597                 int j = 0;
598                 char suppress_message[LINE_MAX];
599                 struct iovec suppress_iovec[15];
600
601                 /* Write a suppression message if we suppressed something */
602
603                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
604                 char_array_0(suppress_message);
605
606                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
607                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
608
609                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
610         }
611
612         free(path);
613
614 finish:
615         dispatch_message_real(s, iovec, n, m, ucred, tv);
616 }
617
618 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
619         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
620         struct iovec iovec[16];
621         unsigned n = 0;
622         int priority = LOG_USER | LOG_INFO;
623
624         assert(s);
625         assert(buf);
626
627         parse_syslog_priority((char**) &buf, &priority);
628         skip_syslog_date((char**) &buf);
629
630         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
631                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
632
633         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
634                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
635
636         message = strappend("MESSAGE=", buf);
637         if (message)
638                 IOVEC_SET_STRING(iovec[n++], message);
639
640         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
641
642         free(message);
643         free(syslog_facility);
644         free(syslog_priority);
645 }
646
647 static bool valid_user_field(const char *p, size_t l) {
648         const char *a;
649
650         /* We kinda enforce POSIX syntax recommendations for
651            environment variables here, but make a couple of additional
652            requirements.
653
654            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
655
656         /* No empty field names */
657         if (l <= 0)
658                 return false;
659
660         /* Don't allow names longer than 64 chars */
661         if (l > 64)
662                 return false;
663
664         /* Variables starting with an underscore are protected */
665         if (p[0] == '_')
666                 return false;
667
668         /* Don't allow digits as first character */
669         if (p[0] >= '0' && p[0] <= '9')
670                 return false;
671
672         /* Only allow A-Z0-9 and '_' */
673         for (a = p; a < p + l; a++)
674                 if (!((*a >= 'A' && *a <= 'Z') ||
675                       (*a >= '0' && *a <= '9') ||
676                       *a == '_'))
677                         return false;
678
679         return true;
680 }
681
682 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
683         struct iovec *iovec = NULL;
684         unsigned n = 0, m = 0, j;
685         const char *p;
686         size_t remaining;
687         int priority = LOG_INFO;
688
689         assert(s);
690         assert(buffer || n == 0);
691
692         p = buffer;
693         remaining = buffer_size;
694
695         while (remaining > 0) {
696                 const char *e, *q;
697
698                 e = memchr(p, '\n', remaining);
699
700                 if (!e) {
701                         /* Trailing noise, let's ignore it, and flush what we collected */
702                         log_debug("Received message with trailing noise, ignoring.");
703                         break;
704                 }
705
706                 if (e == p) {
707                         /* Entry separator */
708                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
709                         n = 0;
710                         priority = LOG_INFO;
711
712                         p++;
713                         remaining--;
714                         continue;
715                 }
716
717                 if (*p == '.' || *p == '#') {
718                         /* Ignore control commands for now, and
719                          * comments too. */
720                         remaining -= (e - p) + 1;
721                         p = e + 1;
722                         continue;
723                 }
724
725                 /* A property follows */
726
727                 if (n+13 >= m) {
728                         struct iovec *c;
729                         unsigned u;
730
731                         u = MAX((n+13U) * 2U, 4U);
732                         c = realloc(iovec, u * sizeof(struct iovec));
733                         if (!c) {
734                                 log_error("Out of memory");
735                                 break;
736                         }
737
738                         iovec = c;
739                         m = u;
740                 }
741
742                 q = memchr(p, '=', e - p);
743                 if (q) {
744                         if (valid_user_field(p, q - p)) {
745                                 /* If the field name starts with an
746                                  * underscore, skip the variable,
747                                  * since that indidates a trusted
748                                  * field */
749                                 iovec[n].iov_base = (char*) p;
750                                 iovec[n].iov_len = e - p;
751                                 n++;
752
753                                 /* We need to determine the priority
754                                  * of this entry for the rate limiting
755                                  * logic */
756                                 if (e - p == 10 &&
757                                     memcmp(p, "PRIORITY=", 10) == 0 &&
758                                     p[10] >= '0' &&
759                                     p[10] <= '9')
760                                         priority = p[10] - '0';
761                         }
762
763                         remaining -= (e - p) + 1;
764                         p = e + 1;
765                         continue;
766                 } else {
767                         uint64_t l;
768                         char *k;
769
770                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
771                                 log_debug("Failed to parse message, ignoring.");
772                                 break;
773                         }
774
775                         memcpy(&l, e + 1, sizeof(uint64_t));
776                         l = le64toh(l);
777
778                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
779                             e[1+sizeof(uint64_t)+l] != '\n') {
780                                 log_debug("Failed to parse message, ignoring.");
781                                 break;
782                         }
783
784                         k = malloc((e - p) + 1 + l);
785                         if (!k) {
786                                 log_error("Out of memory");
787                                 break;
788                         }
789
790                         memcpy(k, p, e - p);
791                         k[e - p] = '=';
792                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
793
794                         if (valid_user_field(p, e - p)) {
795                                 iovec[n].iov_base = k;
796                                 iovec[n].iov_len = (e - p) + 1 + l;
797                                 n++;
798                         } else
799                                 free(k);
800
801                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
802                         p = e + 1 + sizeof(uint64_t) + l + 1;
803                 }
804         }
805
806         dispatch_message(s, iovec, n, m, ucred, tv, priority);
807
808         for (j = 0; j < n; j++)
809                 if (iovec[j].iov_base < buffer ||
810                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
811                         free(iovec[j].iov_base);
812 }
813
814 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
815         struct iovec iovec[15];
816         char *message = NULL, *syslog_priority = NULL;
817         unsigned n = 0;
818         size_t tag_len;
819         int priority;
820
821         assert(s);
822         assert(p);
823
824         priority = s->priority;
825
826         if (s->priority_prefix &&
827             l > 3 &&
828             p[0] == '<' &&
829             p[1] >= '0' && p[1] <= '7' &&
830             p[2] == '>') {
831
832                 priority = p[1] - '0';
833                 p += 3;
834                 l -= 3;
835         }
836
837         if (l <= 0)
838                 return 0;
839
840         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
841                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
842
843         tag_len = s->tag ? strlen(s->tag) + 2: 0;
844         message = malloc(8 + tag_len + l);
845         if (message) {
846                 memcpy(message, "MESSAGE=", 8);
847
848                 if (s->tag) {
849                         memcpy(message+8, s->tag, tag_len-2);
850                         memcpy(message+8+tag_len-2, ": ", 2);
851                 }
852
853                 memcpy(message+8+tag_len, p, l);
854                 iovec[n].iov_base = message;
855                 iovec[n].iov_len = 8+tag_len+l;
856                 n++;
857         }
858
859         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
860
861         if (s->tee_console) {
862                 int console;
863
864                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
865                 if (console >= 0) {
866                         n = 0;
867                         if (s->tag) {
868                                 IOVEC_SET_STRING(iovec[n++], s->tag);
869                                 IOVEC_SET_STRING(iovec[n++], ": ");
870                         }
871
872                         iovec[n].iov_base = (void*) p;
873                         iovec[n].iov_len = l;
874                         n++;
875
876                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
877
878                         writev(console, iovec, n);
879                 }
880         }
881
882         free(message);
883         free(syslog_priority);
884
885         return 0;
886 }
887
888 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
889         assert(s);
890         assert(p);
891
892         while (l > 0 && strchr(WHITESPACE, *p)) {
893                 l--;
894                 p++;
895         }
896
897         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
898                 l--;
899
900         switch (s->state) {
901
902         case STDOUT_STREAM_TAG:
903
904                 if (l > 0) {
905                         s->tag = strndup(p, l);
906                         if (!s->tag) {
907                                 log_error("Out of memory");
908                                 return -EINVAL;
909                         }
910                 }
911
912                 s->state = STDOUT_STREAM_PRIORITY;
913                 return 0;
914
915         case STDOUT_STREAM_PRIORITY:
916                 if (l != 1 || *p < '0' || *p > '7') {
917                         log_warning("Failed to parse log priority line.");
918                         return -EINVAL;
919                 }
920
921                 s->priority = *p - '0';
922                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
923                 return 0;
924
925         case STDOUT_STREAM_PRIORITY_PREFIX:
926                 if (l != 1 || *p < '0' || *p > '1') {
927                         log_warning("Failed to parse priority prefix line.");
928                         return -EINVAL;
929                 }
930
931                 s->priority_prefix = *p - '0';
932                 s->state = STDOUT_STREAM_TEE_CONSOLE;
933                 return 0;
934
935         case STDOUT_STREAM_TEE_CONSOLE:
936                 if (l != 1 || *p < '0' || *p > '1') {
937                         log_warning("Failed to parse tee to console line.");
938                         return -EINVAL;
939                 }
940
941                 s->tee_console = *p - '0';
942                 s->state = STDOUT_STREAM_RUNNING;
943                 return 0;
944
945         case STDOUT_STREAM_RUNNING:
946                 return stdout_stream_log(s, p, l);
947         }
948
949         assert_not_reached("Unknown stream state");
950 }
951
952 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
953         char *p;
954         size_t remaining;
955         int r;
956
957         assert(s);
958
959         p = s->buffer;
960         remaining = s->length;
961         for (;;) {
962                 char *end;
963                 size_t skip;
964
965                 end = memchr(p, '\n', remaining);
966                 if (!end) {
967                         if (remaining >= LINE_MAX) {
968                                 end = p + LINE_MAX;
969                                 skip = LINE_MAX;
970                         } else
971                                 break;
972                 } else
973                         skip = end - p + 1;
974
975                 r = stdout_stream_line(s, p, end - p);
976                 if (r < 0)
977                         return r;
978
979                 remaining -= skip;
980                 p += skip;
981         }
982
983         if (force_flush && remaining > 0) {
984                 r = stdout_stream_line(s, p, remaining);
985                 if (r < 0)
986                         return r;
987
988                 p += remaining;
989                 remaining = 0;
990         }
991
992         if (p > s->buffer) {
993                 memmove(s->buffer, p, remaining);
994                 s->length = remaining;
995         }
996
997         return 0;
998 }
999
1000 static int stdout_stream_process(StdoutStream *s) {
1001         ssize_t l;
1002         int r;
1003
1004         assert(s);
1005
1006         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
1007         if (l < 0) {
1008
1009                 if (errno == EAGAIN)
1010                         return 0;
1011
1012                 log_warning("Failed to read from stream: %m");
1013                 return -errno;
1014         }
1015
1016         if (l == 0) {
1017                 r = stdout_stream_scan(s, true);
1018                 if (r < 0)
1019                         return r;
1020
1021                 return 0;
1022         }
1023
1024         s->length += l;
1025         r = stdout_stream_scan(s, false);
1026         if (r < 0)
1027                 return r;
1028
1029         return 1;
1030
1031 }
1032
1033 static void stdout_stream_free(StdoutStream *s) {
1034         assert(s);
1035
1036         if (s->server) {
1037                 assert(s->server->n_stdout_streams > 0);
1038                 s->server->n_stdout_streams --;
1039                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1040         }
1041
1042         if (s->fd >= 0) {
1043                 if (s->server)
1044                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1045
1046                 close_nointr_nofail(s->fd);
1047         }
1048
1049         free(s->tag);
1050         free(s);
1051 }
1052
1053 static int stdout_stream_new(Server *s) {
1054         StdoutStream *stream;
1055         int fd, r;
1056         socklen_t len;
1057         struct epoll_event ev;
1058
1059         assert(s);
1060
1061         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1062         if (fd < 0) {
1063                 if (errno == EAGAIN)
1064                         return 0;
1065
1066                 log_error("Failed to accept stdout connection: %m");
1067                 return -errno;
1068         }
1069
1070         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1071                 log_warning("Too many stdout streams, refusing connection.");
1072                 close_nointr_nofail(fd);
1073                 return 0;
1074         }
1075
1076         stream = new0(StdoutStream, 1);
1077         if (!stream) {
1078                 log_error("Out of memory.");
1079                 close_nointr_nofail(fd);
1080                 return -ENOMEM;
1081         }
1082
1083         stream->fd = fd;
1084
1085         len = sizeof(stream->ucred);
1086         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1087                 log_error("Failed to determine peer credentials: %m");
1088                 r = -errno;
1089                 goto fail;
1090         }
1091
1092         if (shutdown(fd, SHUT_WR) < 0) {
1093                 log_error("Failed to shutdown writing side of socket: %m");
1094                 r = -errno;
1095                 goto fail;
1096         }
1097
1098         zero(ev);
1099         ev.data.ptr = stream;
1100         ev.events = EPOLLIN;
1101         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1102                 log_error("Failed to add stream to event loop: %m");
1103                 r = -errno;
1104                 goto fail;
1105         }
1106
1107         stream->server = s;
1108         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1109         s->n_stdout_streams ++;
1110
1111         return 0;
1112
1113 fail:
1114         stdout_stream_free(stream);
1115         return r;
1116 }
1117
1118 static int system_journal_open(Server *s) {
1119         int r;
1120         char *fn;
1121         sd_id128_t machine;
1122         char ids[33];
1123
1124         r = sd_id128_get_machine(&machine);
1125         if (r < 0)
1126                 return r;
1127
1128         sd_id128_to_string(machine, ids);
1129
1130         if (!s->system_journal) {
1131
1132                 /* First try to create the machine path, but not the prefix */
1133                 fn = strappend("/var/log/journal/", ids);
1134                 if (!fn)
1135                         return -ENOMEM;
1136                 (void) mkdir(fn, 0755);
1137                 free(fn);
1138
1139                 /* The create the system journal file */
1140                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1141                 if (!fn)
1142                         return -ENOMEM;
1143
1144                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1145                 free(fn);
1146
1147                 if (r >= 0) {
1148                         s->system_journal->metrics = s->metrics;
1149                         s->system_journal->compress = s->compress;
1150
1151                         fix_perms(s->system_journal, 0);
1152                 } else if (r < 0) {
1153
1154                         if (r == -ENOENT)
1155                                 r = 0;
1156                         else {
1157                                 log_error("Failed to open system journal: %s", strerror(-r));
1158                                 return r;
1159                         }
1160                 }
1161         }
1162
1163         if (!s->runtime_journal) {
1164
1165                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1166                 if (!fn)
1167                         return -ENOMEM;
1168
1169                 if (s->system_journal) {
1170
1171                         /* Try to open the runtime journal, but only
1172                          * if it already exists, so that we can flush
1173                          * it into the system journal */
1174
1175                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1176                         free(fn);
1177
1178                         if (r < 0) {
1179
1180                                 if (r == -ENOENT)
1181                                         r = 0;
1182                                 else {
1183                                         log_error("Failed to open runtime journal: %s", strerror(-r));
1184                                         return r;
1185                                 }
1186                         }
1187
1188                 } else {
1189
1190                         /* OK, we really need the runtime journal, so create
1191                          * it if necessary. */
1192
1193                         (void) mkdir_parents(fn, 0755);
1194                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1195                         free(fn);
1196
1197                         if (r < 0) {
1198                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1199                                 return r;
1200                         }
1201                 }
1202
1203                 if (s->runtime_journal) {
1204                         s->runtime_journal->metrics = s->metrics;
1205                         s->runtime_journal->compress = s->compress;
1206
1207                         fix_perms(s->runtime_journal, 0);
1208                 }
1209         }
1210
1211         return r;
1212 }
1213
1214 static int server_flush_to_var(Server *s) {
1215         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1216         Object *o = NULL;
1217         int r;
1218         sd_id128_t machine;
1219         sd_journal *j;
1220         usec_t ts;
1221
1222         assert(s);
1223
1224         if (!s->runtime_journal)
1225                 return 0;
1226
1227         ts = now(CLOCK_MONOTONIC);
1228         if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1229                 return 0;
1230
1231         s->var_available_timestamp = ts;
1232
1233         system_journal_open(s);
1234
1235         if (!s->system_journal)
1236                 return 0;
1237
1238         r = sd_id128_get_machine(&machine);
1239         if (r < 0) {
1240                 log_error("Failed to get machine id: %s", strerror(-r));
1241                 return r;
1242         }
1243
1244         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1245         if (r < 0) {
1246                 log_error("Failed to read runtime journal: %s", strerror(-r));
1247                 return r;
1248         }
1249
1250         SD_JOURNAL_FOREACH(j) {
1251                 JournalFile *f;
1252
1253                 f = j->current_file;
1254                 assert(f && f->current_offset > 0);
1255
1256                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1257                 if (r < 0) {
1258                         log_error("Can't read entry: %s", strerror(-r));
1259                         goto finish;
1260                 }
1261
1262                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1263                 if (r == -E2BIG) {
1264                         log_info("Allocation limit reached.");
1265
1266                         journal_file_post_change(s->system_journal);
1267                         server_vacuum(s);
1268
1269                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1270                 }
1271
1272                 if (r < 0) {
1273                         log_error("Can't write entry: %s", strerror(-r));
1274                         goto finish;
1275                 }
1276         }
1277
1278 finish:
1279         journal_file_post_change(s->system_journal);
1280
1281         journal_file_close(s->runtime_journal);
1282         s->runtime_journal = NULL;
1283
1284         if (r >= 0) {
1285                 sd_id128_to_string(machine, path + 17);
1286                 rm_rf(path, false, true, false);
1287         }
1288
1289         return r;
1290 }
1291
1292 static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1293         struct msghdr msghdr;
1294         struct iovec iovec;
1295         struct cmsghdr *cmsg;
1296         union {
1297                 struct cmsghdr cmsghdr;
1298                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1299                             CMSG_SPACE(sizeof(struct timeval))];
1300         } control;
1301         union sockaddr_union sa;
1302
1303         assert(s);
1304
1305         zero(msghdr);
1306
1307         zero(iovec);
1308         iovec.iov_base = (void*) buffer;
1309         iovec.iov_len = length;
1310         msghdr.msg_iov = &iovec;
1311         msghdr.msg_iovlen = 1;
1312
1313         zero(sa);
1314         sa.un.sun_family = AF_UNIX;
1315         strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1316         msghdr.msg_name = &sa;
1317         msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1318
1319         zero(control);
1320         msghdr.msg_control = &control;
1321         msghdr.msg_controllen = sizeof(control);
1322
1323         cmsg = CMSG_FIRSTHDR(&msghdr);
1324         cmsg->cmsg_level = SOL_SOCKET;
1325         cmsg->cmsg_type = SCM_CREDENTIALS;
1326         cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1327         memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1328         msghdr.msg_controllen = cmsg->cmsg_len;
1329
1330         /* Forward the syslog message we received via /dev/log to
1331          * /run/systemd/syslog. Unfortunately we currently can't set
1332          * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1333
1334         if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1335                 return;
1336
1337         if (errno == ESRCH) {
1338                 struct ucred u;
1339
1340                 /* Hmm, presumably the sender process vanished
1341                  * by now, so let's fix it as good as we
1342                  * can, and retry */
1343
1344                 u = *ucred;
1345                 u.pid = getpid();
1346                 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1347
1348                 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1349                         return;
1350         }
1351
1352         log_debug("Failed to forward syslog message: %m");
1353 }
1354
1355 static int process_event(Server *s, struct epoll_event *ev) {
1356         assert(s);
1357
1358         if (ev->data.fd == s->signal_fd) {
1359                 struct signalfd_siginfo sfsi;
1360                 ssize_t n;
1361
1362                 if (ev->events != EPOLLIN) {
1363                         log_info("Got invalid event from epoll.");
1364                         return -EIO;
1365                 }
1366
1367                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1368                 if (n != sizeof(sfsi)) {
1369
1370                         if (n >= 0)
1371                                 return -EIO;
1372
1373                         if (errno == EINTR || errno == EAGAIN)
1374                                 return 0;
1375
1376                         return -errno;
1377                 }
1378
1379                 if (sfsi.ssi_signo == SIGUSR1) {
1380                         server_flush_to_var(s);
1381                         return 0;
1382                 }
1383
1384                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1385                 return 0;
1386
1387         } else if (ev->data.fd == s->native_fd ||
1388                    ev->data.fd == s->syslog_fd) {
1389
1390                 if (ev->events != EPOLLIN) {
1391                         log_info("Got invalid event from epoll.");
1392                         return -EIO;
1393                 }
1394
1395                 for (;;) {
1396                         struct msghdr msghdr;
1397                         struct iovec iovec;
1398                         struct ucred *ucred = NULL;
1399                         struct timeval *tv = NULL;
1400                         struct cmsghdr *cmsg;
1401                         union {
1402                                 struct cmsghdr cmsghdr;
1403                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1404                                             CMSG_SPACE(sizeof(struct timeval))];
1405                         } control;
1406                         ssize_t n;
1407                         int v;
1408
1409                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1410                                 log_error("SIOCINQ failed: %m");
1411                                 return -errno;
1412                         }
1413
1414                         if (v <= 0)
1415                                 return 1;
1416
1417                         if (s->buffer_size < (size_t) v) {
1418                                 void *b;
1419                                 size_t l;
1420
1421                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1422                                 b = realloc(s->buffer, l+1);
1423
1424                                 if (!b) {
1425                                         log_error("Couldn't increase buffer.");
1426                                         return -ENOMEM;
1427                                 }
1428
1429                                 s->buffer_size = l;
1430                                 s->buffer = b;
1431                         }
1432
1433                         zero(iovec);
1434                         iovec.iov_base = s->buffer;
1435                         iovec.iov_len = s->buffer_size;
1436
1437                         zero(control);
1438                         zero(msghdr);
1439                         msghdr.msg_iov = &iovec;
1440                         msghdr.msg_iovlen = 1;
1441                         msghdr.msg_control = &control;
1442                         msghdr.msg_controllen = sizeof(control);
1443
1444                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1445                         if (n < 0) {
1446
1447                                 if (errno == EINTR || errno == EAGAIN)
1448                                         return 1;
1449
1450                                 log_error("recvmsg() failed: %m");
1451                                 return -errno;
1452                         }
1453
1454                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1455
1456                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1457                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1458                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1459                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1460                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1461                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1462                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1463                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1464                         }
1465
1466                         if (ev->data.fd == s->syslog_fd) {
1467                                 char *e;
1468
1469                                 e = memchr(s->buffer, '\n', n);
1470                                 if (e)
1471                                         *e = 0;
1472                                 else
1473                                         s->buffer[n] = 0;
1474
1475                                 forward_syslog(s, s->buffer, n, ucred, tv);
1476                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1477                         } else
1478                                 process_native_message(s, s->buffer, n, ucred, tv);
1479                 }
1480
1481                 return 1;
1482
1483         } else if (ev->data.fd == s->stdout_fd) {
1484
1485                 if (ev->events != EPOLLIN) {
1486                         log_info("Got invalid event from epoll.");
1487                         return -EIO;
1488                 }
1489
1490                 stdout_stream_new(s);
1491                 return 1;
1492
1493         } else {
1494                 StdoutStream *stream;
1495
1496                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1497                         log_info("Got invalid event from epoll.");
1498                         return -EIO;
1499                 }
1500
1501                 /* If it is none of the well-known fds, it must be an
1502                  * stdout stream fd. Note that this is a bit ugly here
1503                  * (since we rely that none of the well-known fds
1504                  * could be interpreted as pointer), but nonetheless
1505                  * safe, since the well-known fds would never get an
1506                  * fd > 4096, i.e. beyond the first memory page */
1507
1508                 stream = ev->data.ptr;
1509
1510                 if (stdout_stream_process(stream) <= 0)
1511                         stdout_stream_free(stream);
1512
1513                 return 1;
1514         }
1515
1516         log_error("Unknown event.");
1517         return 0;
1518 }
1519
1520 static int open_syslog_socket(Server *s) {
1521         union sockaddr_union sa;
1522         int one, r;
1523         struct epoll_event ev;
1524         struct timeval tv;
1525
1526         assert(s);
1527
1528         if (s->syslog_fd < 0) {
1529
1530                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1531                 if (s->syslog_fd < 0) {
1532                         log_error("socket() failed: %m");
1533                         return -errno;
1534                 }
1535
1536                 zero(sa);
1537                 sa.un.sun_family = AF_UNIX;
1538                 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
1539
1540                 unlink(sa.un.sun_path);
1541
1542                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1543                 if (r < 0) {
1544                         log_error("bind() failed: %m");
1545                         return -errno;
1546                 }
1547
1548                 chmod(sa.un.sun_path, 0666);
1549         }
1550
1551         one = 1;
1552         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1553         if (r < 0) {
1554                 log_error("SO_PASSCRED failed: %m");
1555                 return -errno;
1556         }
1557
1558         one = 1;
1559         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1560         if (r < 0) {
1561                 log_error("SO_TIMESTAMP failed: %m");
1562                 return -errno;
1563         }
1564
1565         /* Since we use the same socket for forwarding this to some
1566          * other syslog implementation, make sure we don't hang
1567          * forever */
1568         timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1569         if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1570                 log_error("SO_SNDTIMEO failed: %m");
1571                 return -errno;
1572         }
1573
1574         zero(ev);
1575         ev.events = EPOLLIN;
1576         ev.data.fd = s->syslog_fd;
1577         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1578                 log_error("Failed to add syslog server fd to epoll object: %m");
1579                 return -errno;
1580         }
1581
1582         return 0;
1583 }
1584
1585 static int open_native_socket(Server*s) {
1586         union sockaddr_union sa;
1587         int one, r;
1588         struct epoll_event ev;
1589
1590         assert(s);
1591
1592         if (s->native_fd < 0) {
1593
1594                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1595                 if (s->native_fd < 0) {
1596                         log_error("socket() failed: %m");
1597                         return -errno;
1598                 }
1599
1600                 zero(sa);
1601                 sa.un.sun_family = AF_UNIX;
1602                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1603
1604                 unlink(sa.un.sun_path);
1605
1606                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1607                 if (r < 0) {
1608                         log_error("bind() failed: %m");
1609                         return -errno;
1610                 }
1611
1612                 chmod(sa.un.sun_path, 0666);
1613         }
1614
1615         one = 1;
1616         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1617         if (r < 0) {
1618                 log_error("SO_PASSCRED failed: %m");
1619                 return -errno;
1620         }
1621
1622         one = 1;
1623         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1624         if (r < 0) {
1625                 log_error("SO_TIMESTAMP failed: %m");
1626                 return -errno;
1627         }
1628
1629         zero(ev);
1630         ev.events = EPOLLIN;
1631         ev.data.fd = s->native_fd;
1632         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1633                 log_error("Failed to add native server fd to epoll object: %m");
1634                 return -errno;
1635         }
1636
1637         return 0;
1638 }
1639
1640 static int open_stdout_socket(Server *s) {
1641         union sockaddr_union sa;
1642         int r;
1643         struct epoll_event ev;
1644
1645         assert(s);
1646
1647         if (s->stdout_fd < 0) {
1648
1649                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1650                 if (s->stdout_fd < 0) {
1651                         log_error("socket() failed: %m");
1652                         return -errno;
1653                 }
1654
1655                 zero(sa);
1656                 sa.un.sun_family = AF_UNIX;
1657                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1658
1659                 unlink(sa.un.sun_path);
1660
1661                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1662                 if (r < 0) {
1663                         log_error("bind() failed: %m");
1664                         return -errno;
1665                 }
1666
1667                 chmod(sa.un.sun_path, 0666);
1668
1669                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1670                         log_error("liste() failed: %m");
1671                         return -errno;
1672                 }
1673         }
1674
1675         zero(ev);
1676         ev.events = EPOLLIN;
1677         ev.data.fd = s->stdout_fd;
1678         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1679                 log_error("Failed to add stdout server fd to epoll object: %m");
1680                 return -errno;
1681         }
1682
1683         return 0;
1684 }
1685
1686 static int open_signalfd(Server *s) {
1687         sigset_t mask;
1688         struct epoll_event ev;
1689
1690         assert(s);
1691
1692         assert_se(sigemptyset(&mask) == 0);
1693         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1694         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1695
1696         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1697         if (s->signal_fd < 0) {
1698                 log_error("signalfd(): %m");
1699                 return -errno;
1700         }
1701
1702         zero(ev);
1703         ev.events = EPOLLIN;
1704         ev.data.fd = s->signal_fd;
1705
1706         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1707                 log_error("epoll_ctl(): %m");
1708                 return -errno;
1709         }
1710
1711         return 0;
1712 }
1713
1714 static int server_init(Server *s) {
1715         int n, r, fd;
1716
1717         assert(s);
1718
1719         zero(*s);
1720         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1721         s->metrics.max_size = DEFAULT_MAX_SIZE;
1722         s->metrics.min_size = DEFAULT_MIN_SIZE;
1723         s->metrics.keep_free = DEFAULT_KEEP_FREE;
1724         s->max_use = DEFAULT_MAX_USE;
1725         s->compress = true;
1726
1727         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1728         if (!s->user_journals) {
1729                 log_error("Out of memory.");
1730                 return -ENOMEM;
1731         }
1732
1733         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1734         if (s->epoll_fd < 0) {
1735                 log_error("Failed to create epoll object: %m");
1736                 return -errno;
1737         }
1738
1739         n = sd_listen_fds(true);
1740         if (n < 0) {
1741                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1742                 return n;
1743         }
1744
1745         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1746
1747                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1748
1749                         if (s->native_fd >= 0) {
1750                                 log_error("Too many native sockets passed.");
1751                                 return -EINVAL;
1752                         }
1753
1754                         s->native_fd = fd;
1755
1756                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1757
1758                         if (s->stdout_fd >= 0) {
1759                                 log_error("Too many stdout sockets passed.");
1760                                 return -EINVAL;
1761                         }
1762
1763                         s->stdout_fd = fd;
1764
1765                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1766
1767                         if (s->syslog_fd >= 0) {
1768                                 log_error("Too many /dev/log sockets passed.");
1769                                 return -EINVAL;
1770                         }
1771
1772                         s->syslog_fd = fd;
1773
1774                 } else {
1775                         log_error("Unknown socket passed.");
1776                         return -EINVAL;
1777                 }
1778         }
1779
1780         r = open_syslog_socket(s);
1781         if (r < 0)
1782                 return r;
1783
1784         r = open_native_socket(s);
1785         if (r < 0)
1786                 return r;
1787
1788         r = open_stdout_socket(s);
1789         if (r < 0)
1790                 return r;
1791
1792         r = system_journal_open(s);
1793         if (r < 0)
1794                 return r;
1795
1796         r = open_signalfd(s);
1797         if (r < 0)
1798                 return r;
1799
1800         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1801         if (!s->rate_limit)
1802                 return -ENOMEM;
1803
1804         return 0;
1805 }
1806
1807 static void server_done(Server *s) {
1808         JournalFile *f;
1809         assert(s);
1810
1811         while (s->stdout_streams)
1812                 stdout_stream_free(s->stdout_streams);
1813
1814         if (s->system_journal)
1815                 journal_file_close(s->system_journal);
1816
1817         if (s->runtime_journal)
1818                 journal_file_close(s->runtime_journal);
1819
1820         while ((f = hashmap_steal_first(s->user_journals)))
1821                 journal_file_close(f);
1822
1823         hashmap_free(s->user_journals);
1824
1825         if (s->epoll_fd >= 0)
1826                 close_nointr_nofail(s->epoll_fd);
1827
1828         if (s->signal_fd >= 0)
1829                 close_nointr_nofail(s->signal_fd);
1830
1831         if (s->syslog_fd >= 0)
1832                 close_nointr_nofail(s->syslog_fd);
1833
1834         if (s->native_fd >= 0)
1835                 close_nointr_nofail(s->native_fd);
1836
1837         if (s->stdout_fd >= 0)
1838                 close_nointr_nofail(s->stdout_fd);
1839
1840         if (s->rate_limit)
1841                 journal_rate_limit_free(s->rate_limit);
1842
1843         free(s->buffer);
1844 }
1845
1846 int main(int argc, char *argv[]) {
1847         Server server;
1848         int r;
1849
1850         /* if (getppid() != 1) { */
1851         /*         log_error("This program should be invoked by init only."); */
1852         /*         return EXIT_FAILURE; */
1853         /* } */
1854
1855         if (argc > 1) {
1856                 log_error("This program does not take arguments.");
1857                 return EXIT_FAILURE;
1858         }
1859
1860         log_set_target(LOG_TARGET_CONSOLE);
1861         log_parse_environment();
1862         log_open();
1863
1864         umask(0022);
1865
1866         r = server_init(&server);
1867         if (r < 0)
1868                 goto finish;
1869
1870         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1871
1872         sd_notify(false,
1873                   "READY=1\n"
1874                   "STATUS=Processing requests...");
1875
1876         server_vacuum(&server);
1877         server_flush_to_var(&server);
1878
1879         for (;;) {
1880                 struct epoll_event event;
1881
1882                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1883                 if (r < 0) {
1884
1885                         if (errno == EINTR)
1886                                 continue;
1887
1888                         log_error("epoll_wait() failed: %m");
1889                         r = -errno;
1890                         goto finish;
1891                 } else if (r == 0)
1892                         break;
1893
1894                 r = process_event(&server, &event);
1895                 if (r < 0)
1896                         goto finish;
1897                 else if (r == 0)
1898                         break;
1899         }
1900
1901         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1902
1903 finish:
1904         sd_notify(false,
1905                   "STATUS=Shutting down...");
1906
1907         server_done(&server);
1908
1909         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1910 }