chiark / gitweb /
journald: forward all syslog messages to syslogd
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43 #include "sd-journal.h"
44 #include "journal-internal.h"
45
46 #define USER_JOURNALS_MAX 1024
47 #define STDOUT_STREAMS_MAX 4096
48
49 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
50 #define DEFAULT_RATE_LIMIT_BURST 200
51
52 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
53
54 #define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC)
55
56 #define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC)
57
58 typedef struct StdoutStream StdoutStream;
59
60 typedef struct Server {
61         int epoll_fd;
62         int signal_fd;
63         int syslog_fd;
64         int native_fd;
65         int stdout_fd;
66
67         JournalFile *runtime_journal;
68         JournalFile *system_journal;
69         Hashmap *user_journals;
70
71         uint64_t seqnum;
72
73         char *buffer;
74         size_t buffer_size;
75
76         JournalRateLimit *rate_limit;
77
78         JournalMetrics metrics;
79         uint64_t max_use;
80         bool compress;
81
82         uint64_t cached_available_space;
83         usec_t cached_available_space_timestamp;
84
85         uint64_t var_available_timestamp;
86
87         LIST_HEAD(StdoutStream, stdout_streams);
88         unsigned n_stdout_streams;
89 } Server;
90
91 typedef enum StdoutStreamState {
92         STDOUT_STREAM_TAG,
93         STDOUT_STREAM_PRIORITY,
94         STDOUT_STREAM_PRIORITY_PREFIX,
95         STDOUT_STREAM_TEE_CONSOLE,
96         STDOUT_STREAM_RUNNING
97 } StdoutStreamState;
98
99 struct StdoutStream {
100         Server *server;
101         StdoutStreamState state;
102
103         int fd;
104
105         struct ucred ucred;
106
107         char *tag;
108         int priority;
109         bool priority_prefix:1;
110         bool tee_console:1;
111
112         char buffer[LINE_MAX+1];
113         size_t length;
114
115         LIST_FIELDS(StdoutStream, stdout_stream);
116 };
117
118 static int server_flush_to_var(Server *s);
119
120 static uint64_t available_space(Server *s) {
121         char ids[33];
122         sd_id128_t machine;
123         char *p;
124         const char *f;
125         struct statvfs ss;
126         uint64_t sum = 0, avail = 0, ss_avail = 0;
127         int r;
128         DIR *d;
129         usec_t ts = now(CLOCK_MONOTONIC);
130
131         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
132                 return s->cached_available_space;
133
134         r = sd_id128_get_machine(&machine);
135         if (r < 0)
136                 return 0;
137
138         if (s->system_journal)
139                 f = "/var/log/journal/";
140         else
141                 f = "/run/log/journal/";
142
143         p = strappend(f, sd_id128_to_string(machine, ids));
144         if (!p)
145                 return 0;
146
147         d = opendir(p);
148         free(p);
149
150         if (!d)
151                 return 0;
152
153         if (fstatvfs(dirfd(d), &ss) < 0)
154                 goto finish;
155
156         for (;;) {
157                 struct stat st;
158                 struct dirent buf, *de;
159                 int k;
160
161                 k = readdir_r(d, &buf, &de);
162                 if (k != 0) {
163                         r = -k;
164                         goto finish;
165                 }
166
167                 if (!de)
168                         break;
169
170                 if (!dirent_is_file_with_suffix(de, ".journal"))
171                         continue;
172
173                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
174                         continue;
175
176                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
177         }
178
179         avail = sum >= s->max_use ? 0 : s->max_use - sum;
180
181         ss_avail = ss.f_bsize * ss.f_bavail;
182
183         ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
184
185         if (ss_avail < avail)
186                 avail = ss_avail;
187
188         s->cached_available_space = avail;
189         s->cached_available_space_timestamp = ts;
190
191 finish:
192         closedir(d);
193
194         return avail;
195 }
196
197 static void fix_perms(JournalFile *f, uid_t uid) {
198         acl_t acl;
199         acl_entry_t entry;
200         acl_permset_t permset;
201         int r;
202
203         assert(f);
204
205         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
206         if (r < 0)
207                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
208
209         if (uid <= 0)
210                 return;
211
212         acl = acl_get_fd(f->fd);
213         if (!acl) {
214                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
215                 return;
216         }
217
218         r = acl_find_uid(acl, uid, &entry);
219         if (r <= 0) {
220
221                 if (acl_create_entry(&acl, &entry) < 0 ||
222                     acl_set_tag_type(entry, ACL_USER) < 0 ||
223                     acl_set_qualifier(entry, &uid) < 0) {
224                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
225                         goto finish;
226                 }
227         }
228
229         if (acl_get_permset(entry, &permset) < 0 ||
230             acl_add_perm(permset, ACL_READ) < 0 ||
231             acl_calc_mask(&acl) < 0) {
232                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
233                 goto finish;
234         }
235
236         if (acl_set_fd(f->fd, acl) < 0)
237                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
238
239 finish:
240         acl_free(acl);
241 }
242
243 static JournalFile* find_journal(Server *s, uid_t uid) {
244         char *p;
245         int r;
246         JournalFile *f;
247         char ids[33];
248         sd_id128_t machine;
249
250         assert(s);
251
252         /* We split up user logs only on /var, not on /run. If the
253          * runtime file is open, we write to it exclusively, in order
254          * to guarantee proper order as soon as we flush /run to
255          * /var and close the runtime file. */
256
257         if (s->runtime_journal)
258                 return s->runtime_journal;
259
260         if (uid <= 0)
261                 return s->system_journal;
262
263         r = sd_id128_get_machine(&machine);
264         if (r < 0)
265                 return s->system_journal;
266
267         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
268         if (f)
269                 return f;
270
271         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
272                 return s->system_journal;
273
274         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
275                 /* Too many open? Then let's close one */
276                 f = hashmap_steal_first(s->user_journals);
277                 assert(f);
278                 journal_file_close(f);
279         }
280
281         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
282         free(p);
283
284         if (r < 0)
285                 return s->system_journal;
286
287         fix_perms(f, uid);
288         f->metrics = s->metrics;
289         f->compress = s->compress;
290
291         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
292         if (r < 0) {
293                 journal_file_close(f);
294                 return s->system_journal;
295         }
296
297         return f;
298 }
299
300 static void server_vacuum(Server *s) {
301         Iterator i;
302         void *k;
303         char *p;
304         char ids[33];
305         sd_id128_t machine;
306         int r;
307         JournalFile *f;
308
309         log_info("Rotating...");
310
311         if (s->runtime_journal) {
312                 r = journal_file_rotate(&s->runtime_journal);
313                 if (r < 0)
314                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
315         }
316
317         if (s->system_journal) {
318                 r = journal_file_rotate(&s->system_journal);
319                 if (r < 0)
320                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
321         }
322
323         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
324                 r = journal_file_rotate(&f);
325                 if (r < 0)
326                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
327                 else
328                         hashmap_replace(s->user_journals, k, f);
329         }
330
331         log_info("Vacuuming...");
332
333         r = sd_id128_get_machine(&machine);
334         if (r < 0) {
335                 log_error("Failed to get machine ID: %s", strerror(-r));
336                 return;
337         }
338
339         if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
340                 log_error("Out of memory.");
341                 return;
342         }
343
344         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
345         if (r < 0 && r != -ENOENT)
346                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
347         free(p);
348
349         if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
350                 log_error("Out of memory.");
351                 return;
352         }
353
354         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
355         if (r < 0 && r != -ENOENT)
356                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
357         free(p);
358
359         s->cached_available_space_timestamp = 0;
360 }
361
362 static char *shortened_cgroup_path(pid_t pid) {
363         int r;
364         char *process_path, *init_path, *path;
365
366         assert(pid > 0);
367
368         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
369         if (r < 0)
370                 return NULL;
371
372         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
373         if (r < 0) {
374                 free(process_path);
375                 return NULL;
376         }
377
378         if (streq(init_path, "/"))
379                 init_path[0] = 0;
380
381         if (startswith(process_path, init_path))
382                 path = process_path + strlen(init_path);
383         else
384                 path = process_path;
385
386         free(init_path);
387
388         return path;
389 }
390
391 static void dispatch_message_real(Server *s,
392                              struct iovec *iovec, unsigned n, unsigned m,
393                              struct ucred *ucred,
394                              struct timeval *tv) {
395
396         char *pid = NULL, *uid = NULL, *gid = NULL,
397                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
398                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
399                 *audit_session = NULL, *audit_loginuid = NULL,
400                 *exe = NULL, *cgroup = NULL;
401
402         char idbuf[33];
403         sd_id128_t id;
404         int r;
405         char *t;
406         uid_t loginuid = 0, realuid = 0;
407         JournalFile *f;
408         bool vacuumed = false;
409
410         assert(s);
411         assert(iovec);
412         assert(n > 0);
413         assert(n + 13 <= m);
414
415         if (ucred) {
416                 uint32_t session;
417                 char *path;
418
419                 realuid = ucred->uid;
420
421                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
422                         IOVEC_SET_STRING(iovec[n++], pid);
423
424                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
425                         IOVEC_SET_STRING(iovec[n++], uid);
426
427                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
428                         IOVEC_SET_STRING(iovec[n++], gid);
429
430                 r = get_process_comm(ucred->pid, &t);
431                 if (r >= 0) {
432                         comm = strappend("_COMM=", t);
433                         if (comm)
434                                 IOVEC_SET_STRING(iovec[n++], comm);
435                         free(t);
436                 }
437
438                 r = get_process_exe(ucred->pid, &t);
439                 if (r >= 0) {
440                         exe = strappend("_EXE=", t);
441                         if (comm)
442                                 IOVEC_SET_STRING(iovec[n++], exe);
443                         free(t);
444                 }
445
446                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
447                 if (r >= 0) {
448                         cmdline = strappend("_CMDLINE=", t);
449                         if (cmdline)
450                                 IOVEC_SET_STRING(iovec[n++], cmdline);
451                         free(t);
452                 }
453
454                 r = audit_session_from_pid(ucred->pid, &session);
455                 if (r >= 0)
456                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
457                                 IOVEC_SET_STRING(iovec[n++], audit_session);
458
459                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
460                 if (r >= 0)
461                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
462                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
463
464                 path = shortened_cgroup_path(ucred->pid);
465                 if (path) {
466                         cgroup = strappend("_SYSTEMD_CGROUP=", path);
467                         if (cgroup)
468                                 IOVEC_SET_STRING(iovec[n++], cgroup);
469
470                         free(path);
471                 }
472         }
473
474         if (tv) {
475                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
476                              (unsigned long long) timeval_load(tv)) >= 0)
477                         IOVEC_SET_STRING(iovec[n++], source_time);
478         }
479
480         /* Note that strictly speaking storing the boot id here is
481          * redundant since the entry includes this in-line
482          * anyway. However, we need this indexed, too. */
483         r = sd_id128_get_boot(&id);
484         if (r >= 0)
485                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
486                         IOVEC_SET_STRING(iovec[n++], boot_id);
487
488         r = sd_id128_get_machine(&id);
489         if (r >= 0)
490                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
491                         IOVEC_SET_STRING(iovec[n++], machine_id);
492
493         t = gethostname_malloc();
494         if (t) {
495                 hostname = strappend("_HOSTNAME=", t);
496                 if (hostname)
497                         IOVEC_SET_STRING(iovec[n++], hostname);
498                 free(t);
499         }
500
501         assert(n <= m);
502
503         server_flush_to_var(s);
504
505 retry:
506         f = find_journal(s, realuid == 0 ? 0 : loginuid);
507         if (!f)
508                 log_warning("Dropping message, as we can't find a place to store the data.");
509         else {
510                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
511
512                 if (r == -E2BIG && !vacuumed) {
513                         log_info("Allocation limit reached.");
514
515                         server_vacuum(s);
516                         vacuumed = true;
517
518                         log_info("Retrying write.");
519                         goto retry;
520                 }
521
522                 if (r < 0)
523                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
524         }
525
526         free(pid);
527         free(uid);
528         free(gid);
529         free(comm);
530         free(exe);
531         free(cmdline);
532         free(source_time);
533         free(boot_id);
534         free(machine_id);
535         free(hostname);
536         free(audit_session);
537         free(audit_loginuid);
538         free(cgroup);
539 }
540
541 static void dispatch_message(Server *s,
542                              struct iovec *iovec, unsigned n, unsigned m,
543                              struct ucred *ucred,
544                              struct timeval *tv,
545                              int priority) {
546         int rl;
547         char *path, *c;
548
549         assert(s);
550         assert(iovec || n == 0);
551
552         if (n == 0)
553                 return;
554
555         if (!ucred)
556                 goto finish;
557
558         path = shortened_cgroup_path(ucred->pid);
559         if (!path)
560                 goto finish;
561
562         /* example: /user/lennart/3/foobar
563          *          /system/dbus.service/foobar
564          *
565          * So let's cut of everything past the third /, since that is
566          * wher user directories start */
567
568         c = strchr(path, '/');
569         if (c) {
570                 c = strchr(c+1, '/');
571                 if (c) {
572                         c = strchr(c+1, '/');
573                         if (c)
574                                 *c = 0;
575                 }
576         }
577
578         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
579
580         if (rl == 0) {
581                 free(path);
582                 return;
583         }
584
585         if (rl > 1) {
586                 int j = 0;
587                 char suppress_message[LINE_MAX];
588                 struct iovec suppress_iovec[15];
589
590                 /* Write a suppression message if we suppressed something */
591
592                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
593                 char_array_0(suppress_message);
594
595                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
596                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
597
598                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
599         }
600
601         free(path);
602
603 finish:
604         dispatch_message_real(s, iovec, n, m, ucred, tv);
605 }
606
607 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
608         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
609         struct iovec iovec[16];
610         unsigned n = 0;
611         int priority = LOG_USER | LOG_INFO;
612
613         assert(s);
614         assert(buf);
615
616         parse_syslog_priority((char**) &buf, &priority);
617         skip_syslog_date((char**) &buf);
618
619         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
620                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
621
622         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
623                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
624
625         message = strappend("MESSAGE=", buf);
626         if (message)
627                 IOVEC_SET_STRING(iovec[n++], message);
628
629         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
630
631         free(message);
632         free(syslog_facility);
633         free(syslog_priority);
634 }
635
636 static bool valid_user_field(const char *p, size_t l) {
637         const char *a;
638
639         /* We kinda enforce POSIX syntax recommendations for
640            environment variables here, but make a couple of additional
641            requirements.
642
643            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
644
645         /* No empty field names */
646         if (l <= 0)
647                 return false;
648
649         /* Don't allow names longer than 64 chars */
650         if (l > 64)
651                 return false;
652
653         /* Variables starting with an underscore are protected */
654         if (p[0] == '_')
655                 return false;
656
657         /* Don't allow digits as first character */
658         if (p[0] >= '0' && p[0] <= '9')
659                 return false;
660
661         /* Only allow A-Z0-9 and '_' */
662         for (a = p; a < p + l; a++)
663                 if (!((*a >= 'A' && *a <= 'Z') ||
664                       (*a >= '0' && *a <= '9') ||
665                       *a == '_'))
666                         return false;
667
668         return true;
669 }
670
671 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
672         struct iovec *iovec = NULL;
673         unsigned n = 0, m = 0, j;
674         const char *p;
675         size_t remaining;
676         int priority = LOG_INFO;
677
678         assert(s);
679         assert(buffer || n == 0);
680
681         p = buffer;
682         remaining = buffer_size;
683
684         while (remaining > 0) {
685                 const char *e, *q;
686
687                 e = memchr(p, '\n', remaining);
688
689                 if (!e) {
690                         /* Trailing noise, let's ignore it, and flush what we collected */
691                         log_debug("Received message with trailing noise, ignoring.");
692                         break;
693                 }
694
695                 if (e == p) {
696                         /* Entry separator */
697                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
698                         n = 0;
699                         priority = LOG_INFO;
700
701                         p++;
702                         remaining--;
703                         continue;
704                 }
705
706                 if (*p == '.' || *p == '#') {
707                         /* Ignore control commands for now, and
708                          * comments too. */
709                         remaining -= (e - p) + 1;
710                         p = e + 1;
711                         continue;
712                 }
713
714                 /* A property follows */
715
716                 if (n+13 >= m) {
717                         struct iovec *c;
718                         unsigned u;
719
720                         u = MAX((n+13U) * 2U, 4U);
721                         c = realloc(iovec, u * sizeof(struct iovec));
722                         if (!c) {
723                                 log_error("Out of memory");
724                                 break;
725                         }
726
727                         iovec = c;
728                         m = u;
729                 }
730
731                 q = memchr(p, '=', e - p);
732                 if (q) {
733                         if (valid_user_field(p, q - p)) {
734                                 /* If the field name starts with an
735                                  * underscore, skip the variable,
736                                  * since that indidates a trusted
737                                  * field */
738                                 iovec[n].iov_base = (char*) p;
739                                 iovec[n].iov_len = e - p;
740                                 n++;
741
742                                 /* We need to determine the priority
743                                  * of this entry for the rate limiting
744                                  * logic */
745                                 if (e - p == 10 &&
746                                     memcmp(p, "PRIORITY=", 10) == 0 &&
747                                     p[10] >= '0' &&
748                                     p[10] <= '9')
749                                         priority = p[10] - '0';
750                         }
751
752                         remaining -= (e - p) + 1;
753                         p = e + 1;
754                         continue;
755                 } else {
756                         uint64_t l;
757                         char *k;
758
759                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
760                                 log_debug("Failed to parse message, ignoring.");
761                                 break;
762                         }
763
764                         memcpy(&l, e + 1, sizeof(uint64_t));
765                         l = le64toh(l);
766
767                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
768                             e[1+sizeof(uint64_t)+l] != '\n') {
769                                 log_debug("Failed to parse message, ignoring.");
770                                 break;
771                         }
772
773                         k = malloc((e - p) + 1 + l);
774                         if (!k) {
775                                 log_error("Out of memory");
776                                 break;
777                         }
778
779                         memcpy(k, p, e - p);
780                         k[e - p] = '=';
781                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
782
783                         if (valid_user_field(p, e - p)) {
784                                 iovec[n].iov_base = k;
785                                 iovec[n].iov_len = (e - p) + 1 + l;
786                                 n++;
787                         } else
788                                 free(k);
789
790                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
791                         p = e + 1 + sizeof(uint64_t) + l + 1;
792                 }
793         }
794
795         dispatch_message(s, iovec, n, m, ucred, tv, priority);
796
797         for (j = 0; j < n; j++)
798                 if (iovec[j].iov_base < buffer ||
799                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
800                         free(iovec[j].iov_base);
801 }
802
803 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
804         struct iovec iovec[15];
805         char *message = NULL, *syslog_priority = NULL;
806         unsigned n = 0;
807         size_t tag_len;
808         int priority;
809
810         assert(s);
811         assert(p);
812
813         priority = s->priority;
814
815         if (s->priority_prefix &&
816             l > 3 &&
817             p[0] == '<' &&
818             p[1] >= '0' && p[1] <= '7' &&
819             p[2] == '>') {
820
821                 priority = p[1] - '0';
822                 p += 3;
823                 l -= 3;
824         }
825
826         if (l <= 0)
827                 return 0;
828
829         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
830                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
831
832         tag_len = s->tag ? strlen(s->tag) + 2: 0;
833         message = malloc(8 + tag_len + l);
834         if (message) {
835                 memcpy(message, "MESSAGE=", 8);
836
837                 if (s->tag) {
838                         memcpy(message+8, s->tag, tag_len-2);
839                         memcpy(message+8+tag_len-2, ": ", 2);
840                 }
841
842                 memcpy(message+8+tag_len, p, l);
843                 iovec[n].iov_base = message;
844                 iovec[n].iov_len = 8+tag_len+l;
845                 n++;
846         }
847
848         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
849
850         if (s->tee_console) {
851                 int console;
852
853                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
854                 if (console >= 0) {
855                         n = 0;
856                         if (s->tag) {
857                                 IOVEC_SET_STRING(iovec[n++], s->tag);
858                                 IOVEC_SET_STRING(iovec[n++], ": ");
859                         }
860
861                         iovec[n].iov_base = (void*) p;
862                         iovec[n].iov_len = l;
863                         n++;
864
865                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
866
867                         writev(console, iovec, n);
868                 }
869         }
870
871         free(message);
872         free(syslog_priority);
873
874         return 0;
875 }
876
877 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
878         assert(s);
879         assert(p);
880
881         while (l > 0 && strchr(WHITESPACE, *p)) {
882                 l--;
883                 p++;
884         }
885
886         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
887                 l--;
888
889         switch (s->state) {
890
891         case STDOUT_STREAM_TAG:
892
893                 if (l > 0) {
894                         s->tag = strndup(p, l);
895                         if (!s->tag) {
896                                 log_error("Out of memory");
897                                 return -EINVAL;
898                         }
899                 }
900
901                 s->state = STDOUT_STREAM_PRIORITY;
902                 return 0;
903
904         case STDOUT_STREAM_PRIORITY:
905                 if (l != 1 || *p < '0' || *p > '7') {
906                         log_warning("Failed to parse log priority line.");
907                         return -EINVAL;
908                 }
909
910                 s->priority = *p - '0';
911                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
912                 return 0;
913
914         case STDOUT_STREAM_PRIORITY_PREFIX:
915                 if (l != 1 || *p < '0' || *p > '1') {
916                         log_warning("Failed to parse priority prefix line.");
917                         return -EINVAL;
918                 }
919
920                 s->priority_prefix = *p - '0';
921                 s->state = STDOUT_STREAM_TEE_CONSOLE;
922                 return 0;
923
924         case STDOUT_STREAM_TEE_CONSOLE:
925                 if (l != 1 || *p < '0' || *p > '1') {
926                         log_warning("Failed to parse tee to console line.");
927                         return -EINVAL;
928                 }
929
930                 s->tee_console = *p - '0';
931                 s->state = STDOUT_STREAM_RUNNING;
932                 return 0;
933
934         case STDOUT_STREAM_RUNNING:
935                 return stdout_stream_log(s, p, l);
936         }
937
938         assert_not_reached("Unknown stream state");
939 }
940
941 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
942         char *p;
943         size_t remaining;
944         int r;
945
946         assert(s);
947
948         p = s->buffer;
949         remaining = s->length;
950         for (;;) {
951                 char *end;
952                 size_t skip;
953
954                 end = memchr(p, '\n', remaining);
955                 if (!end) {
956                         if (remaining >= LINE_MAX) {
957                                 end = p + LINE_MAX;
958                                 skip = LINE_MAX;
959                         } else
960                                 break;
961                 } else
962                         skip = end - p + 1;
963
964                 r = stdout_stream_line(s, p, end - p);
965                 if (r < 0)
966                         return r;
967
968                 remaining -= skip;
969                 p += skip;
970         }
971
972         if (force_flush && remaining > 0) {
973                 r = stdout_stream_line(s, p, remaining);
974                 if (r < 0)
975                         return r;
976
977                 p += remaining;
978                 remaining = 0;
979         }
980
981         if (p > s->buffer) {
982                 memmove(s->buffer, p, remaining);
983                 s->length = remaining;
984         }
985
986         return 0;
987 }
988
989 static int stdout_stream_process(StdoutStream *s) {
990         ssize_t l;
991         int r;
992
993         assert(s);
994
995         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
996         if (l < 0) {
997
998                 if (errno == EAGAIN)
999                         return 0;
1000
1001                 log_warning("Failed to read from stream: %m");
1002                 return -errno;
1003         }
1004
1005         if (l == 0) {
1006                 r = stdout_stream_scan(s, true);
1007                 if (r < 0)
1008                         return r;
1009
1010                 return 0;
1011         }
1012
1013         s->length += l;
1014         r = stdout_stream_scan(s, false);
1015         if (r < 0)
1016                 return r;
1017
1018         return 1;
1019
1020 }
1021
1022 static void stdout_stream_free(StdoutStream *s) {
1023         assert(s);
1024
1025         if (s->server) {
1026                 assert(s->server->n_stdout_streams > 0);
1027                 s->server->n_stdout_streams --;
1028                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1029         }
1030
1031         if (s->fd >= 0) {
1032                 if (s->server)
1033                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1034
1035                 close_nointr_nofail(s->fd);
1036         }
1037
1038         free(s->tag);
1039         free(s);
1040 }
1041
1042 static int stdout_stream_new(Server *s) {
1043         StdoutStream *stream;
1044         int fd, r;
1045         socklen_t len;
1046         struct epoll_event ev;
1047
1048         assert(s);
1049
1050         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1051         if (fd < 0) {
1052                 if (errno == EAGAIN)
1053                         return 0;
1054
1055                 log_error("Failed to accept stdout connection: %m");
1056                 return -errno;
1057         }
1058
1059         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1060                 log_warning("Too many stdout streams, refusing connection.");
1061                 close_nointr_nofail(fd);
1062                 return 0;
1063         }
1064
1065         stream = new0(StdoutStream, 1);
1066         if (!stream) {
1067                 log_error("Out of memory.");
1068                 close_nointr_nofail(fd);
1069                 return -ENOMEM;
1070         }
1071
1072         stream->fd = fd;
1073
1074         len = sizeof(stream->ucred);
1075         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1076                 log_error("Failed to determine peer credentials: %m");
1077                 r = -errno;
1078                 goto fail;
1079         }
1080
1081         if (shutdown(fd, SHUT_WR) < 0) {
1082                 log_error("Failed to shutdown writing side of socket: %m");
1083                 r = -errno;
1084                 goto fail;
1085         }
1086
1087         zero(ev);
1088         ev.data.ptr = stream;
1089         ev.events = EPOLLIN;
1090         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1091                 log_error("Failed to add stream to event loop: %m");
1092                 r = -errno;
1093                 goto fail;
1094         }
1095
1096         stream->server = s;
1097         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1098         s->n_stdout_streams ++;
1099
1100         return 0;
1101
1102 fail:
1103         stdout_stream_free(stream);
1104         return r;
1105 }
1106
1107 static int system_journal_open(Server *s) {
1108         int r;
1109         char *fn;
1110         sd_id128_t machine;
1111         char ids[33];
1112
1113         r = sd_id128_get_machine(&machine);
1114         if (r < 0)
1115                 return r;
1116
1117         sd_id128_to_string(machine, ids);
1118
1119         if (!s->system_journal) {
1120
1121                 /* First try to create the machine path, but not the prefix */
1122                 fn = strappend("/var/log/journal/", ids);
1123                 if (!fn)
1124                         return -ENOMEM;
1125                 (void) mkdir(fn, 0755);
1126                 free(fn);
1127
1128                 /* The create the system journal file */
1129                 fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1130                 if (!fn)
1131                         return -ENOMEM;
1132
1133                 r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1134                 free(fn);
1135
1136                 if (r >= 0) {
1137                         s->system_journal->metrics = s->metrics;
1138                         s->system_journal->compress = s->compress;
1139
1140                         fix_perms(s->system_journal, 0);
1141                 } else if (r < 0) {
1142
1143                         if (r == -ENOENT)
1144                                 r = 0;
1145                         else {
1146                                 log_error("Failed to open system journal: %s", strerror(-r));
1147                                 return r;
1148                         }
1149                 }
1150         }
1151
1152         if (!s->runtime_journal) {
1153
1154                 fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1155                 if (!fn)
1156                         return -ENOMEM;
1157
1158                 if (s->system_journal) {
1159
1160                         /* Try to open the runtime journal, but only
1161                          * if it already exists, so that we can flush
1162                          * it into the system journal */
1163
1164                         r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal);
1165                         free(fn);
1166
1167                         if (r < 0) {
1168
1169                                 if (r == -ENOENT)
1170                                         r = 0;
1171                                 else {
1172                                         log_error("Failed to open runtime journal: %s", strerror(-r));
1173                                         return r;
1174                                 }
1175                         }
1176
1177                 } else {
1178
1179                         /* OK, we really need the runtime journal, so create
1180                          * it if necessary. */
1181
1182                         (void) mkdir_parents(fn, 0755);
1183                         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1184                         free(fn);
1185
1186                         if (r < 0) {
1187                                 log_error("Failed to open runtime journal: %s", strerror(-r));
1188                                 return r;
1189                         }
1190                 }
1191
1192                 if (s->runtime_journal) {
1193                         s->runtime_journal->metrics = s->metrics;
1194                         s->runtime_journal->compress = s->compress;
1195
1196                         fix_perms(s->runtime_journal, 0);
1197                 }
1198         }
1199
1200         return r;
1201 }
1202
1203 static int server_flush_to_var(Server *s) {
1204         char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";
1205         Object *o = NULL;
1206         int r;
1207         sd_id128_t machine;
1208         sd_journal *j;
1209         usec_t ts;
1210
1211         assert(s);
1212
1213         if (!s->runtime_journal)
1214                 return 0;
1215
1216         ts = now(CLOCK_MONOTONIC);
1217         if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts)
1218                 return 0;
1219
1220         s->var_available_timestamp = ts;
1221
1222         system_journal_open(s);
1223
1224         if (!s->system_journal)
1225                 return 0;
1226
1227         r = sd_id128_get_machine(&machine);
1228         if (r < 0) {
1229                 log_error("Failed to get machine id: %s", strerror(-r));
1230                 return r;
1231         }
1232
1233         r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY);
1234         if (r < 0) {
1235                 log_error("Failed to read runtime journal: %s", strerror(-r));
1236                 return r;
1237         }
1238
1239         SD_JOURNAL_FOREACH(j) {
1240                 JournalFile *f;
1241
1242                 f = j->current_file;
1243                 assert(f && f->current_offset > 0);
1244
1245                 r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o);
1246                 if (r < 0) {
1247                         log_error("Can't read entry: %s", strerror(-r));
1248                         goto finish;
1249                 }
1250
1251                 r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1252                 if (r == -E2BIG) {
1253                         log_info("Allocation limit reached.");
1254
1255                         journal_file_post_change(s->system_journal);
1256                         server_vacuum(s);
1257
1258                         r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL);
1259                 }
1260
1261                 if (r < 0) {
1262                         log_error("Can't write entry: %s", strerror(-r));
1263                         goto finish;
1264                 }
1265         }
1266
1267 finish:
1268         journal_file_post_change(s->system_journal);
1269
1270         journal_file_close(s->runtime_journal);
1271         s->runtime_journal = NULL;
1272
1273         if (r >= 0) {
1274                 sd_id128_to_string(machine, path + 17);
1275                 rm_rf(path, false, true, false);
1276         }
1277
1278         return r;
1279 }
1280
1281 static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) {
1282         struct msghdr msghdr;
1283         struct iovec iovec;
1284         struct cmsghdr *cmsg;
1285         union {
1286                 struct cmsghdr cmsghdr;
1287                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1288                             CMSG_SPACE(sizeof(struct timeval))];
1289         } control;
1290         union sockaddr_union sa;
1291
1292         assert(s);
1293
1294         zero(msghdr);
1295
1296         zero(iovec);
1297         iovec.iov_base = (void*) buffer;
1298         iovec.iov_len = length;
1299         msghdr.msg_iov = &iovec;
1300         msghdr.msg_iovlen = 1;
1301
1302         zero(sa);
1303         sa.un.sun_family = AF_UNIX;
1304         strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1305         msghdr.msg_name = &sa;
1306         msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path);
1307
1308         zero(control);
1309         msghdr.msg_control = &control;
1310         msghdr.msg_controllen = sizeof(control);
1311
1312         cmsg = CMSG_FIRSTHDR(&msghdr);
1313         cmsg->cmsg_level = SOL_SOCKET;
1314         cmsg->cmsg_type = SCM_CREDENTIALS;
1315         cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred));
1316         memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred));
1317         msghdr.msg_controllen = cmsg->cmsg_len;
1318
1319         /* Forward the syslog message we received via /dev/log to
1320          * /run/systemd/syslog. Unfortunately we currently can't set
1321          * the SO_TIMESTAMP auxiliary data, and hence we don't. */
1322
1323         if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1324                 return;
1325
1326         if (errno == ESRCH) {
1327                 struct ucred u;
1328
1329                 /* Hmm, presumably the sender process vanished
1330                  * by now, so let's fix it as good as we
1331                  * can, and retry */
1332
1333                 u = *ucred;
1334                 u.pid = getpid();
1335                 memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred));
1336
1337                 if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0)
1338                         return;
1339         }
1340
1341         log_debug("Failed to forward syslog message: %m");
1342 }
1343
1344 static int process_event(Server *s, struct epoll_event *ev) {
1345         assert(s);
1346
1347         if (ev->data.fd == s->signal_fd) {
1348                 struct signalfd_siginfo sfsi;
1349                 ssize_t n;
1350
1351                 if (ev->events != EPOLLIN) {
1352                         log_info("Got invalid event from epoll.");
1353                         return -EIO;
1354                 }
1355
1356                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1357                 if (n != sizeof(sfsi)) {
1358
1359                         if (n >= 0)
1360                                 return -EIO;
1361
1362                         if (errno == EINTR || errno == EAGAIN)
1363                                 return 0;
1364
1365                         return -errno;
1366                 }
1367
1368                 if (sfsi.ssi_signo == SIGUSR1) {
1369                         server_flush_to_var(s);
1370                         return 0;
1371                 }
1372
1373                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1374                 return 0;
1375
1376         } else if (ev->data.fd == s->native_fd ||
1377                    ev->data.fd == s->syslog_fd) {
1378
1379                 if (ev->events != EPOLLIN) {
1380                         log_info("Got invalid event from epoll.");
1381                         return -EIO;
1382                 }
1383
1384                 for (;;) {
1385                         struct msghdr msghdr;
1386                         struct iovec iovec;
1387                         struct ucred *ucred = NULL;
1388                         struct timeval *tv = NULL;
1389                         struct cmsghdr *cmsg;
1390                         union {
1391                                 struct cmsghdr cmsghdr;
1392                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1393                                             CMSG_SPACE(sizeof(struct timeval))];
1394                         } control;
1395                         ssize_t n;
1396                         int v;
1397
1398                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1399                                 log_error("SIOCINQ failed: %m");
1400                                 return -errno;
1401                         }
1402
1403                         if (v <= 0)
1404                                 return 1;
1405
1406                         if (s->buffer_size < (size_t) v) {
1407                                 void *b;
1408                                 size_t l;
1409
1410                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1411                                 b = realloc(s->buffer, l+1);
1412
1413                                 if (!b) {
1414                                         log_error("Couldn't increase buffer.");
1415                                         return -ENOMEM;
1416                                 }
1417
1418                                 s->buffer_size = l;
1419                                 s->buffer = b;
1420                         }
1421
1422                         zero(iovec);
1423                         iovec.iov_base = s->buffer;
1424                         iovec.iov_len = s->buffer_size;
1425
1426                         zero(control);
1427                         zero(msghdr);
1428                         msghdr.msg_iov = &iovec;
1429                         msghdr.msg_iovlen = 1;
1430                         msghdr.msg_control = &control;
1431                         msghdr.msg_controllen = sizeof(control);
1432
1433                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1434                         if (n < 0) {
1435
1436                                 if (errno == EINTR || errno == EAGAIN)
1437                                         return 1;
1438
1439                                 log_error("recvmsg() failed: %m");
1440                                 return -errno;
1441                         }
1442
1443                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1444
1445                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1446                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1447                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1448                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1449                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1450                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1451                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1452                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1453                         }
1454
1455                         if (ev->data.fd == s->syslog_fd) {
1456                                 char *e;
1457
1458                                 e = memchr(s->buffer, '\n', n);
1459                                 if (e)
1460                                         *e = 0;
1461                                 else
1462                                         s->buffer[n] = 0;
1463
1464                                 forward_syslog(s, s->buffer, n, ucred, tv);
1465                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1466                         } else
1467                                 process_native_message(s, s->buffer, n, ucred, tv);
1468                 }
1469
1470                 return 1;
1471
1472         } else if (ev->data.fd == s->stdout_fd) {
1473
1474                 if (ev->events != EPOLLIN) {
1475                         log_info("Got invalid event from epoll.");
1476                         return -EIO;
1477                 }
1478
1479                 stdout_stream_new(s);
1480                 return 1;
1481
1482         } else {
1483                 StdoutStream *stream;
1484
1485                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1486                         log_info("Got invalid event from epoll.");
1487                         return -EIO;
1488                 }
1489
1490                 /* If it is none of the well-known fds, it must be an
1491                  * stdout stream fd. Note that this is a bit ugly here
1492                  * (since we rely that none of the well-known fds
1493                  * could be interpreted as pointer), but nonetheless
1494                  * safe, since the well-known fds would never get an
1495                  * fd > 4096, i.e. beyond the first memory page */
1496
1497                 stream = ev->data.ptr;
1498
1499                 if (stdout_stream_process(stream) <= 0)
1500                         stdout_stream_free(stream);
1501
1502                 return 1;
1503         }
1504
1505         log_error("Unknown event.");
1506         return 0;
1507 }
1508
1509 static int open_syslog_socket(Server *s) {
1510         union sockaddr_union sa;
1511         int one, r;
1512         struct epoll_event ev;
1513         struct timeval tv;
1514
1515         assert(s);
1516
1517         if (s->syslog_fd < 0) {
1518
1519                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1520                 if (s->syslog_fd < 0) {
1521                         log_error("socket() failed: %m");
1522                         return -errno;
1523                 }
1524
1525                 zero(sa);
1526                 sa.un.sun_family = AF_UNIX;
1527                 strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path));
1528
1529                 unlink(sa.un.sun_path);
1530
1531                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1532                 if (r < 0) {
1533                         log_error("bind() failed: %m");
1534                         return -errno;
1535                 }
1536
1537                 chmod(sa.un.sun_path, 0666);
1538         }
1539
1540         one = 1;
1541         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1542         if (r < 0) {
1543                 log_error("SO_PASSCRED failed: %m");
1544                 return -errno;
1545         }
1546
1547         one = 1;
1548         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1549         if (r < 0) {
1550                 log_error("SO_TIMESTAMP failed: %m");
1551                 return -errno;
1552         }
1553
1554         /* Since we use the same socket for forwarding this to some
1555          * other syslog implementation, make sure we don't hang
1556          * forever */
1557         timeval_store(&tv, SYSLOG_TIMEOUT_USEC);
1558         if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) {
1559                 log_error("SO_SNDTIMEO failed: %m");
1560                 return -errno;
1561         }
1562
1563         zero(ev);
1564         ev.events = EPOLLIN;
1565         ev.data.fd = s->syslog_fd;
1566         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1567                 log_error("Failed to add syslog server fd to epoll object: %m");
1568                 return -errno;
1569         }
1570
1571         return 0;
1572 }
1573
1574 static int open_native_socket(Server*s) {
1575         union sockaddr_union sa;
1576         int one, r;
1577         struct epoll_event ev;
1578
1579         assert(s);
1580
1581         if (s->native_fd < 0) {
1582
1583                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1584                 if (s->native_fd < 0) {
1585                         log_error("socket() failed: %m");
1586                         return -errno;
1587                 }
1588
1589                 zero(sa);
1590                 sa.un.sun_family = AF_UNIX;
1591                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1592
1593                 unlink(sa.un.sun_path);
1594
1595                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1596                 if (r < 0) {
1597                         log_error("bind() failed: %m");
1598                         return -errno;
1599                 }
1600
1601                 chmod(sa.un.sun_path, 0666);
1602         }
1603
1604         one = 1;
1605         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1606         if (r < 0) {
1607                 log_error("SO_PASSCRED failed: %m");
1608                 return -errno;
1609         }
1610
1611         one = 1;
1612         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1613         if (r < 0) {
1614                 log_error("SO_TIMESTAMP failed: %m");
1615                 return -errno;
1616         }
1617
1618         zero(ev);
1619         ev.events = EPOLLIN;
1620         ev.data.fd = s->native_fd;
1621         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1622                 log_error("Failed to add native server fd to epoll object: %m");
1623                 return -errno;
1624         }
1625
1626         return 0;
1627 }
1628
1629 static int open_stdout_socket(Server *s) {
1630         union sockaddr_union sa;
1631         int r;
1632         struct epoll_event ev;
1633
1634         assert(s);
1635
1636         if (s->stdout_fd < 0) {
1637
1638                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1639                 if (s->stdout_fd < 0) {
1640                         log_error("socket() failed: %m");
1641                         return -errno;
1642                 }
1643
1644                 zero(sa);
1645                 sa.un.sun_family = AF_UNIX;
1646                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1647
1648                 unlink(sa.un.sun_path);
1649
1650                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1651                 if (r < 0) {
1652                         log_error("bind() failed: %m");
1653                         return -errno;
1654                 }
1655
1656                 chmod(sa.un.sun_path, 0666);
1657
1658                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1659                         log_error("liste() failed: %m");
1660                         return -errno;
1661                 }
1662         }
1663
1664         zero(ev);
1665         ev.events = EPOLLIN;
1666         ev.data.fd = s->stdout_fd;
1667         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1668                 log_error("Failed to add stdout server fd to epoll object: %m");
1669                 return -errno;
1670         }
1671
1672         return 0;
1673 }
1674
1675 static int open_signalfd(Server *s) {
1676         sigset_t mask;
1677         struct epoll_event ev;
1678
1679         assert(s);
1680
1681         assert_se(sigemptyset(&mask) == 0);
1682         sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);
1683         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1684
1685         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1686         if (s->signal_fd < 0) {
1687                 log_error("signalfd(): %m");
1688                 return -errno;
1689         }
1690
1691         zero(ev);
1692         ev.events = EPOLLIN;
1693         ev.data.fd = s->signal_fd;
1694
1695         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1696                 log_error("epoll_ctl(): %m");
1697                 return -errno;
1698         }
1699
1700         return 0;
1701 }
1702
1703 static int server_init(Server *s) {
1704         int n, r, fd;
1705
1706         assert(s);
1707
1708         zero(*s);
1709         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1710         s->metrics.max_size = DEFAULT_MAX_SIZE;
1711         s->metrics.min_size = DEFAULT_MIN_SIZE;
1712         s->metrics.keep_free = DEFAULT_KEEP_FREE;
1713         s->max_use = DEFAULT_MAX_USE;
1714         s->compress = true;
1715
1716         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1717         if (!s->user_journals) {
1718                 log_error("Out of memory.");
1719                 return -ENOMEM;
1720         }
1721
1722         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1723         if (s->epoll_fd < 0) {
1724                 log_error("Failed to create epoll object: %m");
1725                 return -errno;
1726         }
1727
1728         n = sd_listen_fds(true);
1729         if (n < 0) {
1730                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1731                 return n;
1732         }
1733
1734         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1735
1736                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1737
1738                         if (s->native_fd >= 0) {
1739                                 log_error("Too many native sockets passed.");
1740                                 return -EINVAL;
1741                         }
1742
1743                         s->native_fd = fd;
1744
1745                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1746
1747                         if (s->stdout_fd >= 0) {
1748                                 log_error("Too many stdout sockets passed.");
1749                                 return -EINVAL;
1750                         }
1751
1752                         s->stdout_fd = fd;
1753
1754                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1755
1756                         if (s->syslog_fd >= 0) {
1757                                 log_error("Too many /dev/log sockets passed.");
1758                                 return -EINVAL;
1759                         }
1760
1761                         s->syslog_fd = fd;
1762
1763                 } else {
1764                         log_error("Unknown socket passed.");
1765                         return -EINVAL;
1766                 }
1767         }
1768
1769         r = open_syslog_socket(s);
1770         if (r < 0)
1771                 return r;
1772
1773         r = open_native_socket(s);
1774         if (r < 0)
1775                 return r;
1776
1777         r = open_stdout_socket(s);
1778         if (r < 0)
1779                 return r;
1780
1781         r = system_journal_open(s);
1782         if (r < 0)
1783                 return r;
1784
1785         r = open_signalfd(s);
1786         if (r < 0)
1787                 return r;
1788
1789         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1790         if (!s->rate_limit)
1791                 return -ENOMEM;
1792
1793         return 0;
1794 }
1795
1796 static void server_done(Server *s) {
1797         JournalFile *f;
1798         assert(s);
1799
1800         while (s->stdout_streams)
1801                 stdout_stream_free(s->stdout_streams);
1802
1803         if (s->system_journal)
1804                 journal_file_close(s->system_journal);
1805
1806         if (s->runtime_journal)
1807                 journal_file_close(s->runtime_journal);
1808
1809         while ((f = hashmap_steal_first(s->user_journals)))
1810                 journal_file_close(f);
1811
1812         hashmap_free(s->user_journals);
1813
1814         if (s->epoll_fd >= 0)
1815                 close_nointr_nofail(s->epoll_fd);
1816
1817         if (s->signal_fd >= 0)
1818                 close_nointr_nofail(s->signal_fd);
1819
1820         if (s->syslog_fd >= 0)
1821                 close_nointr_nofail(s->syslog_fd);
1822
1823         if (s->native_fd >= 0)
1824                 close_nointr_nofail(s->native_fd);
1825
1826         if (s->stdout_fd >= 0)
1827                 close_nointr_nofail(s->stdout_fd);
1828
1829         if (s->rate_limit)
1830                 journal_rate_limit_free(s->rate_limit);
1831 }
1832
1833 int main(int argc, char *argv[]) {
1834         Server server;
1835         int r;
1836
1837         /* if (getppid() != 1) { */
1838         /*         log_error("This program should be invoked by init only."); */
1839         /*         return EXIT_FAILURE; */
1840         /* } */
1841
1842         if (argc > 1) {
1843                 log_error("This program does not take arguments.");
1844                 return EXIT_FAILURE;
1845         }
1846
1847         log_set_target(LOG_TARGET_CONSOLE);
1848         log_set_max_level(LOG_DEBUG);
1849         log_parse_environment();
1850         log_open();
1851
1852         umask(0022);
1853
1854         r = server_init(&server);
1855         if (r < 0)
1856                 goto finish;
1857
1858         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1859
1860         sd_notify(false,
1861                   "READY=1\n"
1862                   "STATUS=Processing requests...");
1863
1864         server_vacuum(&server);
1865         server_flush_to_var(&server);
1866
1867         for (;;) {
1868                 struct epoll_event event;
1869
1870                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1871                 if (r < 0) {
1872
1873                         if (errno == EINTR)
1874                                 continue;
1875
1876                         log_error("epoll_wait() failed: %m");
1877                         r = -errno;
1878                         goto finish;
1879                 } else if (r == 0)
1880                         break;
1881
1882                 r = process_event(&server, &event);
1883                 if (r < 0)
1884                         goto finish;
1885                 else if (r == 0)
1886                         break;
1887         }
1888
1889         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1890
1891 finish:
1892         sd_notify(false,
1893                   "STATUS=Shutting down...");
1894
1895         server_done(&server);
1896
1897         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1898 }