chiark / gitweb /
journald: increase rate limit burst rate
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43
44 #define USER_JOURNALS_MAX 1024
45 #define STDOUT_STREAMS_MAX 4096
46
47 #define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC)
48 #define DEFAULT_RATE_LIMIT_BURST 200
49
50 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
51
52 typedef struct StdoutStream StdoutStream;
53
54 typedef struct Server {
55         int epoll_fd;
56         int signal_fd;
57         int syslog_fd;
58         int native_fd;
59         int stdout_fd;
60
61         JournalFile *runtime_journal;
62         JournalFile *system_journal;
63         Hashmap *user_journals;
64
65         uint64_t seqnum;
66
67         char *buffer;
68         size_t buffer_size;
69
70         JournalRateLimit *rate_limit;
71
72         JournalMetrics metrics;
73         uint64_t max_use;
74         bool compress;
75
76         uint64_t cached_available_space;
77         usec_t cached_available_space_timestamp;
78
79         LIST_HEAD(StdoutStream, stdout_streams);
80         unsigned n_stdout_streams;
81 } Server;
82
83 typedef enum StdoutStreamState {
84         STDOUT_STREAM_TAG,
85         STDOUT_STREAM_PRIORITY,
86         STDOUT_STREAM_PRIORITY_PREFIX,
87         STDOUT_STREAM_TEE_CONSOLE,
88         STDOUT_STREAM_RUNNING
89 } StdoutStreamState;
90
91 struct StdoutStream {
92         Server *server;
93         StdoutStreamState state;
94
95         int fd;
96
97         struct ucred ucred;
98
99         char *tag;
100         int priority;
101         bool priority_prefix:1;
102         bool tee_console:1;
103
104         char buffer[LINE_MAX+1];
105         size_t length;
106
107         LIST_FIELDS(StdoutStream, stdout_stream);
108 };
109
110 static uint64_t available_space(Server *s) {
111         char ids[33];
112         sd_id128_t machine;
113         char *p;
114         const char *f;
115         struct statvfs ss;
116         uint64_t sum = 0, avail = 0, ss_avail = 0;
117         int r;
118         DIR *d;
119         usec_t ts = now(CLOCK_MONOTONIC);
120
121         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
122                 return s->cached_available_space;
123
124         r = sd_id128_get_machine(&machine);
125         if (r < 0)
126                 return 0;
127
128         if (s->system_journal)
129                 f = "/var/log/journal/";
130         else
131                 f = "/run/log/journal/";
132
133         p = strappend(f, sd_id128_to_string(machine, ids));
134         if (!p)
135                 return 0;
136
137         d = opendir(p);
138         free(p);
139
140         if (!d)
141                 return 0;
142
143         if (fstatvfs(dirfd(d), &ss) < 0)
144                 goto finish;
145
146         for (;;) {
147                 struct stat st;
148                 struct dirent buf, *de;
149                 int k;
150
151                 k = readdir_r(d, &buf, &de);
152                 if (k != 0) {
153                         r = -k;
154                         goto finish;
155                 }
156
157                 if (!de)
158                         break;
159
160                 if (!dirent_is_file_with_suffix(de, ".journal"))
161                         continue;
162
163                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
164                         continue;
165
166                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
167         }
168
169         avail = sum >= s->max_use ? 0 : s->max_use - sum;
170
171         ss_avail = ss.f_bsize * ss.f_bavail;
172
173         ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
174
175         if (ss_avail < avail)
176                 avail = ss_avail;
177
178         s->cached_available_space = avail;
179         s->cached_available_space_timestamp = ts;
180
181 finish:
182         closedir(d);
183
184         return avail;
185 }
186
187 static void fix_perms(JournalFile *f, uid_t uid) {
188         acl_t acl;
189         acl_entry_t entry;
190         acl_permset_t permset;
191         int r;
192
193         assert(f);
194
195         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
196         if (r < 0)
197                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
198
199         if (uid <= 0)
200                 return;
201
202         acl = acl_get_fd(f->fd);
203         if (!acl) {
204                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
205                 return;
206         }
207
208         r = acl_find_uid(acl, uid, &entry);
209         if (r <= 0) {
210
211                 if (acl_create_entry(&acl, &entry) < 0 ||
212                     acl_set_tag_type(entry, ACL_USER) < 0 ||
213                     acl_set_qualifier(entry, &uid) < 0) {
214                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
215                         goto finish;
216                 }
217         }
218
219         if (acl_get_permset(entry, &permset) < 0 ||
220             acl_add_perm(permset, ACL_READ) < 0 ||
221             acl_calc_mask(&acl) < 0) {
222                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
223                 goto finish;
224         }
225
226         if (acl_set_fd(f->fd, acl) < 0)
227                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
228
229 finish:
230         acl_free(acl);
231 }
232
233 static JournalFile* find_journal(Server *s, uid_t uid) {
234         char *p;
235         int r;
236         JournalFile *f;
237         char ids[33];
238         sd_id128_t machine;
239
240         assert(s);
241
242         /* We split up user logs only on /var, not on /run */
243         if (!s->system_journal)
244                 return s->runtime_journal;
245
246         if (uid <= 0)
247                 return s->system_journal;
248
249         r = sd_id128_get_machine(&machine);
250         if (r < 0)
251                 return s->system_journal;
252
253         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
254         if (f)
255                 return f;
256
257         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
258                 return s->system_journal;
259
260         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
261                 /* Too many open? Then let's close one */
262                 f = hashmap_steal_first(s->user_journals);
263                 assert(f);
264                 journal_file_close(f);
265         }
266
267         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
268         free(p);
269
270         if (r < 0)
271                 return s->system_journal;
272
273         fix_perms(f, uid);
274         f->metrics = s->metrics;
275         f->compress = s->compress;
276
277         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
278         if (r < 0) {
279                 journal_file_close(f);
280                 return s->system_journal;
281         }
282
283         return f;
284 }
285
286 static void server_vacuum(Server *s) {
287         Iterator i;
288         void *k;
289         char *p;
290         char ids[33];
291         sd_id128_t machine;
292         int r;
293         JournalFile *f;
294
295         log_info("Rotating...");
296
297         if (s->runtime_journal) {
298                 r = journal_file_rotate(&s->runtime_journal);
299                 if (r < 0)
300                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
301         }
302
303         if (s->system_journal) {
304                 r = journal_file_rotate(&s->system_journal);
305                 if (r < 0)
306                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
307         }
308
309         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
310                 r = journal_file_rotate(&f);
311                 if (r < 0)
312                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
313                 else
314                         hashmap_replace(s->user_journals, k, f);
315         }
316
317         log_info("Vacuuming...");
318
319         r = sd_id128_get_machine(&machine);
320         if (r < 0) {
321                 log_error("Failed to get machine ID: %s", strerror(-r));
322                 return;
323         }
324
325         if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
326                 log_error("Out of memory.");
327                 return;
328         }
329
330         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
331         if (r < 0 && r != -ENOENT)
332                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
333         free(p);
334
335         if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
336                 log_error("Out of memory.");
337                 return;
338         }
339
340         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
341         if (r < 0 && r != -ENOENT)
342                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
343         free(p);
344
345         s->cached_available_space_timestamp = 0;
346 }
347
348 static char *shortened_cgroup_path(pid_t pid) {
349         int r;
350         char *process_path, *init_path, *path;
351
352         assert(pid > 0);
353
354         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
355         if (r < 0)
356                 return NULL;
357
358         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
359         if (r < 0) {
360                 free(process_path);
361                 return NULL;
362         }
363
364         if (streq(init_path, "/"))
365                 init_path[0] = 0;
366
367         if (startswith(process_path, init_path))
368                 path = process_path + strlen(init_path);
369         else
370                 path = process_path;
371
372         free(init_path);
373
374         return path;
375 }
376
377 static void dispatch_message_real(Server *s,
378                              struct iovec *iovec, unsigned n, unsigned m,
379                              struct ucred *ucred,
380                              struct timeval *tv) {
381
382         char *pid = NULL, *uid = NULL, *gid = NULL,
383                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
384                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
385                 *audit_session = NULL, *audit_loginuid = NULL,
386                 *exe = NULL, *cgroup = NULL;
387
388         char idbuf[33];
389         sd_id128_t id;
390         int r;
391         char *t;
392         uid_t loginuid = 0, realuid = 0;
393         JournalFile *f;
394         bool vacuumed = false;
395
396         assert(s);
397         assert(iovec);
398         assert(n > 0);
399         assert(n + 13 <= m);
400
401         if (ucred) {
402                 uint32_t session;
403                 char *path;
404
405                 realuid = ucred->uid;
406
407                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
408                         IOVEC_SET_STRING(iovec[n++], pid);
409
410                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
411                         IOVEC_SET_STRING(iovec[n++], uid);
412
413                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
414                         IOVEC_SET_STRING(iovec[n++], gid);
415
416                 r = get_process_comm(ucred->pid, &t);
417                 if (r >= 0) {
418                         comm = strappend("_COMM=", t);
419                         if (comm)
420                                 IOVEC_SET_STRING(iovec[n++], comm);
421                         free(t);
422                 }
423
424                 r = get_process_exe(ucred->pid, &t);
425                 if (r >= 0) {
426                         exe = strappend("_EXE=", t);
427                         if (comm)
428                                 IOVEC_SET_STRING(iovec[n++], exe);
429                         free(t);
430                 }
431
432                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
433                 if (r >= 0) {
434                         cmdline = strappend("_CMDLINE=", t);
435                         if (cmdline)
436                                 IOVEC_SET_STRING(iovec[n++], cmdline);
437                         free(t);
438                 }
439
440                 r = audit_session_from_pid(ucred->pid, &session);
441                 if (r >= 0)
442                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
443                                 IOVEC_SET_STRING(iovec[n++], audit_session);
444
445                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
446                 if (r >= 0)
447                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
448                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
449
450                 path = shortened_cgroup_path(ucred->pid);
451                 if (path) {
452                         cgroup = strappend("_SYSTEMD_CGROUP=", path);
453                         if (cgroup)
454                                 IOVEC_SET_STRING(iovec[n++], cgroup);
455
456                         free(path);
457                 }
458         }
459
460         if (tv) {
461                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
462                              (unsigned long long) timeval_load(tv)) >= 0)
463                         IOVEC_SET_STRING(iovec[n++], source_time);
464         }
465
466         /* Note that strictly speaking storing the boot id here is
467          * redundant since the entry includes this in-line
468          * anyway. However, we need this indexed, too. */
469         r = sd_id128_get_boot(&id);
470         if (r >= 0)
471                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
472                         IOVEC_SET_STRING(iovec[n++], boot_id);
473
474         r = sd_id128_get_machine(&id);
475         if (r >= 0)
476                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
477                         IOVEC_SET_STRING(iovec[n++], machine_id);
478
479         t = gethostname_malloc();
480         if (t) {
481                 hostname = strappend("_HOSTNAME=", t);
482                 if (hostname)
483                         IOVEC_SET_STRING(iovec[n++], hostname);
484                 free(t);
485         }
486
487         assert(n <= m);
488
489 retry:
490         f = find_journal(s, realuid == 0 ? 0 : loginuid);
491         if (!f)
492                 log_warning("Dropping message, as we can't find a place to store the data.");
493         else {
494                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
495
496                 if (r == -E2BIG && !vacuumed) {
497                         log_info("Allocation limit reached.");
498
499                         server_vacuum(s);
500                         vacuumed = true;
501
502                         log_info("Retrying write.");
503                         goto retry;
504                 }
505
506                 if (r < 0)
507                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
508         }
509
510         free(pid);
511         free(uid);
512         free(gid);
513         free(comm);
514         free(exe);
515         free(cmdline);
516         free(source_time);
517         free(boot_id);
518         free(machine_id);
519         free(hostname);
520         free(audit_session);
521         free(audit_loginuid);
522         free(cgroup);
523 }
524
525 static void dispatch_message(Server *s,
526                              struct iovec *iovec, unsigned n, unsigned m,
527                              struct ucred *ucred,
528                              struct timeval *tv,
529                              int priority) {
530         int rl;
531         char *path, *c;
532
533         assert(s);
534         assert(iovec || n == 0);
535
536         if (n == 0)
537                 return;
538
539         if (!ucred)
540                 goto finish;
541
542         path = shortened_cgroup_path(ucred->pid);
543         if (!path)
544                 goto finish;
545
546         /* example: /user/lennart/3/foobar
547          *          /system/dbus.service/foobar
548          *
549          * So let's cut of everything past the third /, since that is
550          * wher user directories start */
551
552         c = strchr(path, '/');
553         if (c) {
554                 c = strchr(c+1, '/');
555                 if (c) {
556                         c = strchr(c+1, '/');
557                         if (c)
558                                 *c = 0;
559                 }
560         }
561
562         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
563
564         if (rl == 0) {
565                 free(path);
566                 return;
567         }
568
569         if (rl > 1) {
570                 int j = 0;
571                 char suppress_message[LINE_MAX];
572                 struct iovec suppress_iovec[15];
573
574                 /* Write a suppression message if we suppressed something */
575
576                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
577                 char_array_0(suppress_message);
578
579                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
580                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
581
582                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
583         }
584
585         free(path);
586
587 finish:
588         dispatch_message_real(s, iovec, n, m, ucred, tv);
589 }
590
591 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
592         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
593         struct iovec iovec[16];
594         unsigned n = 0;
595         int priority = LOG_USER | LOG_INFO;
596
597         assert(s);
598         assert(buf);
599
600         parse_syslog_priority((char**) &buf, &priority);
601         skip_syslog_date((char**) &buf);
602
603         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
604                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
605
606         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
607                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
608
609         message = strappend("MESSAGE=", buf);
610         if (message)
611                 IOVEC_SET_STRING(iovec[n++], message);
612
613         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
614
615         free(message);
616         free(syslog_facility);
617         free(syslog_priority);
618 }
619
620 static bool valid_user_field(const char *p, size_t l) {
621         const char *a;
622
623         /* We kinda enforce POSIX syntax recommendations for
624            environment variables here, but make a couple of additional
625            requirements.
626
627            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
628
629         /* No empty field names */
630         if (l <= 0)
631                 return false;
632
633         /* Don't allow names longer than 64 chars */
634         if (l > 64)
635                 return false;
636
637         /* Variables starting with an underscore are protected */
638         if (p[0] == '_')
639                 return false;
640
641         /* Don't allow digits as first character */
642         if (p[0] >= '0' && p[0] <= '9')
643                 return false;
644
645         /* Only allow A-Z0-9 and '_' */
646         for (a = p; a < p + l; a++)
647                 if (!((*a >= 'A' && *a <= 'Z') ||
648                       (*a >= '0' && *a <= '9') ||
649                       *a == '_'))
650                         return false;
651
652         return true;
653 }
654
655 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
656         struct iovec *iovec = NULL;
657         unsigned n = 0, m = 0, j;
658         const char *p;
659         size_t remaining;
660         int priority = LOG_INFO;
661
662         assert(s);
663         assert(buffer || n == 0);
664
665         p = buffer;
666         remaining = buffer_size;
667
668         while (remaining > 0) {
669                 const char *e, *q;
670
671                 e = memchr(p, '\n', remaining);
672
673                 if (!e) {
674                         /* Trailing noise, let's ignore it, and flush what we collected */
675                         log_debug("Received message with trailing noise, ignoring.");
676                         break;
677                 }
678
679                 if (e == p) {
680                         /* Entry separator */
681                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
682                         n = 0;
683                         priority = LOG_INFO;
684
685                         p++;
686                         remaining--;
687                         continue;
688                 }
689
690                 if (*p == '.' || *p == '#') {
691                         /* Ignore control commands for now, and
692                          * comments too. */
693                         remaining -= (e - p) + 1;
694                         p = e + 1;
695                         continue;
696                 }
697
698                 /* A property follows */
699
700                 if (n+13 >= m) {
701                         struct iovec *c;
702                         unsigned u;
703
704                         u = MAX((n+13U) * 2U, 4U);
705                         c = realloc(iovec, u * sizeof(struct iovec));
706                         if (!c) {
707                                 log_error("Out of memory");
708                                 break;
709                         }
710
711                         iovec = c;
712                         m = u;
713                 }
714
715                 q = memchr(p, '=', e - p);
716                 if (q) {
717                         if (valid_user_field(p, q - p)) {
718                                 /* If the field name starts with an
719                                  * underscore, skip the variable,
720                                  * since that indidates a trusted
721                                  * field */
722                                 iovec[n].iov_base = (char*) p;
723                                 iovec[n].iov_len = e - p;
724                                 n++;
725
726                                 /* We need to determine the priority
727                                  * of this entry for the rate limiting
728                                  * logic */
729                                 if (e - p == 10 &&
730                                     memcmp(p, "PRIORITY=", 10) == 0 &&
731                                     p[10] >= '0' &&
732                                     p[10] <= '9')
733                                         priority = p[10] - '0';
734                         }
735
736                         remaining -= (e - p) + 1;
737                         p = e + 1;
738                         continue;
739                 } else {
740                         uint64_t l;
741                         char *k;
742
743                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
744                                 log_debug("Failed to parse message, ignoring.");
745                                 break;
746                         }
747
748                         memcpy(&l, e + 1, sizeof(uint64_t));
749                         l = le64toh(l);
750
751                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
752                             e[1+sizeof(uint64_t)+l] != '\n') {
753                                 log_debug("Failed to parse message, ignoring.");
754                                 break;
755                         }
756
757                         k = malloc((e - p) + 1 + l);
758                         if (!k) {
759                                 log_error("Out of memory");
760                                 break;
761                         }
762
763                         memcpy(k, p, e - p);
764                         k[e - p] = '=';
765                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
766
767                         if (valid_user_field(p, e - p)) {
768                                 iovec[n].iov_base = k;
769                                 iovec[n].iov_len = (e - p) + 1 + l;
770                                 n++;
771                         } else
772                                 free(k);
773
774                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
775                         p = e + 1 + sizeof(uint64_t) + l + 1;
776                 }
777         }
778
779         dispatch_message(s, iovec, n, m, ucred, tv, priority);
780
781         for (j = 0; j < n; j++)
782                 if (iovec[j].iov_base < buffer ||
783                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
784                         free(iovec[j].iov_base);
785 }
786
787 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
788         struct iovec iovec[15];
789         char *message = NULL, *syslog_priority = NULL;
790         unsigned n = 0;
791         size_t tag_len;
792         int priority;
793
794         assert(s);
795         assert(p);
796
797         priority = s->priority;
798
799         if (s->priority_prefix &&
800             l > 3 &&
801             p[0] == '<' &&
802             p[1] >= '0' && p[1] <= '7' &&
803             p[2] == '>') {
804
805                 priority = p[1] - '0';
806                 p += 3;
807                 l -= 3;
808         }
809
810         if (l <= 0)
811                 return 0;
812
813         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
814                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
815
816         tag_len = s->tag ? strlen(s->tag) + 2: 0;
817         message = malloc(8 + tag_len + l);
818         if (message) {
819                 memcpy(message, "MESSAGE=", 8);
820
821                 if (s->tag) {
822                         memcpy(message+8, s->tag, tag_len-2);
823                         memcpy(message+8+tag_len-2, ": ", 2);
824                 }
825
826                 memcpy(message+8+tag_len, p, l);
827                 iovec[n].iov_base = message;
828                 iovec[n].iov_len = 8+tag_len+l;
829                 n++;
830         }
831
832         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
833
834         if (s->tee_console) {
835                 int console;
836
837                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
838                 if (console >= 0) {
839                         n = 0;
840                         if (s->tag) {
841                                 IOVEC_SET_STRING(iovec[n++], s->tag);
842                                 IOVEC_SET_STRING(iovec[n++], ": ");
843                         }
844
845                         iovec[n].iov_base = (void*) p;
846                         iovec[n].iov_len = l;
847                         n++;
848
849                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
850
851                         writev(console, iovec, n);
852                 }
853         }
854
855         free(message);
856         free(syslog_priority);
857
858         return 0;
859 }
860
861 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
862         assert(s);
863         assert(p);
864
865         while (l > 0 && strchr(WHITESPACE, *p)) {
866                 l--;
867                 p++;
868         }
869
870         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
871                 l--;
872
873         switch (s->state) {
874
875         case STDOUT_STREAM_TAG:
876
877                 if (l > 0) {
878                         s->tag = strndup(p, l);
879                         if (!s->tag) {
880                                 log_error("Out of memory");
881                                 return -EINVAL;
882                         }
883                 }
884
885                 s->state = STDOUT_STREAM_PRIORITY;
886                 return 0;
887
888         case STDOUT_STREAM_PRIORITY:
889                 if (l != 1 || *p < '0' || *p > '7') {
890                         log_warning("Failed to parse log priority line.");
891                         return -EINVAL;
892                 }
893
894                 s->priority = *p - '0';
895                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
896                 return 0;
897
898         case STDOUT_STREAM_PRIORITY_PREFIX:
899                 if (l != 1 || *p < '0' || *p > '1') {
900                         log_warning("Failed to parse priority prefix line.");
901                         return -EINVAL;
902                 }
903
904                 s->priority_prefix = *p - '0';
905                 s->state = STDOUT_STREAM_TEE_CONSOLE;
906                 return 0;
907
908         case STDOUT_STREAM_TEE_CONSOLE:
909                 if (l != 1 || *p < '0' || *p > '1') {
910                         log_warning("Failed to parse tee to console line.");
911                         return -EINVAL;
912                 }
913
914                 s->tee_console = *p - '0';
915                 s->state = STDOUT_STREAM_RUNNING;
916                 return 0;
917
918         case STDOUT_STREAM_RUNNING:
919                 return stdout_stream_log(s, p, l);
920         }
921
922         assert_not_reached("Unknown stream state");
923 }
924
925 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
926         char *p;
927         size_t remaining;
928         int r;
929
930         assert(s);
931
932         p = s->buffer;
933         remaining = s->length;
934         for (;;) {
935                 char *end;
936                 size_t skip;
937
938                 end = memchr(p, '\n', remaining);
939                 if (!end) {
940                         if (remaining >= LINE_MAX) {
941                                 end = p + LINE_MAX;
942                                 skip = LINE_MAX;
943                         } else
944                                 break;
945                 } else
946                         skip = end - p + 1;
947
948                 r = stdout_stream_line(s, p, end - p);
949                 if (r < 0)
950                         return r;
951
952                 remaining -= skip;
953                 p += skip;
954         }
955
956         if (force_flush && remaining > 0) {
957                 r = stdout_stream_line(s, p, remaining);
958                 if (r < 0)
959                         return r;
960
961                 p += remaining;
962                 remaining = 0;
963         }
964
965         if (p > s->buffer) {
966                 memmove(s->buffer, p, remaining);
967                 s->length = remaining;
968         }
969
970         return 0;
971 }
972
973 static int stdout_stream_process(StdoutStream *s) {
974         ssize_t l;
975         int r;
976
977         assert(s);
978
979         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
980         if (l < 0) {
981
982                 if (errno == EAGAIN)
983                         return 0;
984
985                 log_warning("Failed to read from stream: %m");
986                 return -errno;
987         }
988
989         if (l == 0) {
990                 r = stdout_stream_scan(s, true);
991                 if (r < 0)
992                         return r;
993
994                 return 0;
995         }
996
997         s->length += l;
998         r = stdout_stream_scan(s, false);
999         if (r < 0)
1000                 return r;
1001
1002         return 1;
1003
1004 }
1005
1006 static void stdout_stream_free(StdoutStream *s) {
1007         assert(s);
1008
1009         if (s->server) {
1010                 assert(s->server->n_stdout_streams > 0);
1011                 s->server->n_stdout_streams --;
1012                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1013         }
1014
1015         if (s->fd >= 0) {
1016                 if (s->server)
1017                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1018
1019                 close_nointr_nofail(s->fd);
1020         }
1021
1022         free(s->tag);
1023         free(s);
1024 }
1025
1026 static int stdout_stream_new(Server *s) {
1027         StdoutStream *stream;
1028         int fd, r;
1029         socklen_t len;
1030         struct epoll_event ev;
1031
1032         assert(s);
1033
1034         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1035         if (fd < 0) {
1036                 if (errno == EAGAIN)
1037                         return 0;
1038
1039                 log_error("Failed to accept stdout connection: %m");
1040                 return -errno;
1041         }
1042
1043         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1044                 log_warning("Too many stdout streams, refusing connection.");
1045                 close_nointr_nofail(fd);
1046                 return 0;
1047         }
1048
1049         stream = new0(StdoutStream, 1);
1050         if (!stream) {
1051                 log_error("Out of memory.");
1052                 close_nointr_nofail(fd);
1053                 return -ENOMEM;
1054         }
1055
1056         stream->fd = fd;
1057
1058         len = sizeof(stream->ucred);
1059         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1060                 log_error("Failed to determine peer credentials: %m");
1061                 r = -errno;
1062                 goto fail;
1063         }
1064
1065         if (shutdown(fd, SHUT_WR) < 0) {
1066                 log_error("Failed to shutdown writing side of socket: %m");
1067                 r = -errno;
1068                 goto fail;
1069         }
1070
1071         zero(ev);
1072         ev.data.ptr = stream;
1073         ev.events = EPOLLIN;
1074         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1075                 log_error("Failed to add stream to event loop: %m");
1076                 r = -errno;
1077                 goto fail;
1078         }
1079
1080         stream->server = s;
1081         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1082         s->n_stdout_streams ++;
1083
1084         return 0;
1085
1086 fail:
1087         stdout_stream_free(stream);
1088         return r;
1089 }
1090
1091 static int process_event(Server *s, struct epoll_event *ev) {
1092         assert(s);
1093
1094         if (ev->data.fd == s->signal_fd) {
1095                 struct signalfd_siginfo sfsi;
1096                 ssize_t n;
1097
1098                 if (ev->events != EPOLLIN) {
1099                         log_info("Got invalid event from epoll.");
1100                         return -EIO;
1101                 }
1102
1103                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1104                 if (n != sizeof(sfsi)) {
1105
1106                         if (n >= 0)
1107                                 return -EIO;
1108
1109                         if (errno == EINTR || errno == EAGAIN)
1110                                 return 0;
1111
1112                         return -errno;
1113                 }
1114
1115                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1116                 return 0;
1117
1118         } else if (ev->data.fd == s->native_fd ||
1119                    ev->data.fd == s->syslog_fd) {
1120
1121                 if (ev->events != EPOLLIN) {
1122                         log_info("Got invalid event from epoll.");
1123                         return -EIO;
1124                 }
1125
1126                 for (;;) {
1127                         struct msghdr msghdr;
1128                         struct iovec iovec;
1129                         struct ucred *ucred = NULL;
1130                         struct timeval *tv = NULL;
1131                         struct cmsghdr *cmsg;
1132                         union {
1133                                 struct cmsghdr cmsghdr;
1134                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1135                                             CMSG_SPACE(sizeof(struct timeval))];
1136                         } control;
1137                         ssize_t n;
1138                         int v;
1139
1140                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1141                                 log_error("SIOCINQ failed: %m");
1142                                 return -errno;
1143                         }
1144
1145                         if (v <= 0)
1146                                 return 1;
1147
1148                         if (s->buffer_size < (size_t) v) {
1149                                 void *b;
1150                                 size_t l;
1151
1152                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1153                                 b = realloc(s->buffer, l+1);
1154
1155                                 if (!b) {
1156                                         log_error("Couldn't increase buffer.");
1157                                         return -ENOMEM;
1158                                 }
1159
1160                                 s->buffer_size = l;
1161                                 s->buffer = b;
1162                         }
1163
1164                         zero(iovec);
1165                         iovec.iov_base = s->buffer;
1166                         iovec.iov_len = s->buffer_size;
1167
1168                         zero(control);
1169                         zero(msghdr);
1170                         msghdr.msg_iov = &iovec;
1171                         msghdr.msg_iovlen = 1;
1172                         msghdr.msg_control = &control;
1173                         msghdr.msg_controllen = sizeof(control);
1174
1175                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1176                         if (n < 0) {
1177
1178                                 if (errno == EINTR || errno == EAGAIN)
1179                                         return 1;
1180
1181                                 log_error("recvmsg() failed: %m");
1182                                 return -errno;
1183                         }
1184
1185                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1186
1187                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1188                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1189                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1190                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1191                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1192                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1193                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1194                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1195                         }
1196
1197                         if (ev->data.fd == s->syslog_fd) {
1198                                 char *e;
1199
1200                                 e = memchr(s->buffer, '\n', n);
1201                                 if (e)
1202                                         *e = 0;
1203                                 else
1204                                         s->buffer[n] = 0;
1205
1206                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1207                         } else
1208                                 process_native_message(s, s->buffer, n, ucred, tv);
1209                 }
1210
1211                 return 1;
1212
1213         } else if (ev->data.fd == s->stdout_fd) {
1214
1215                 if (ev->events != EPOLLIN) {
1216                         log_info("Got invalid event from epoll.");
1217                         return -EIO;
1218                 }
1219
1220                 stdout_stream_new(s);
1221                 return 1;
1222
1223         } else {
1224                 StdoutStream *stream;
1225
1226                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1227                         log_info("Got invalid event from epoll.");
1228                         return -EIO;
1229                 }
1230
1231                 /* If it is none of the well-known fds, it must be an
1232                  * stdout stream fd. Note that this is a bit ugly here
1233                  * (since we rely that none of the well-known fds
1234                  * could be interpreted as pointer), but nonetheless
1235                  * safe, since the well-known fds would never get an
1236                  * fd > 4096, i.e. beyond the first memory page */
1237
1238                 stream = ev->data.ptr;
1239
1240                 if (stdout_stream_process(stream) <= 0)
1241                         stdout_stream_free(stream);
1242
1243                 return 1;
1244         }
1245
1246         log_error("Unknown event.");
1247         return 0;
1248 }
1249
1250 static int system_journal_open(Server *s) {
1251         int r;
1252         char *fn;
1253         sd_id128_t machine;
1254         char ids[33];
1255
1256         r = sd_id128_get_machine(&machine);
1257         if (r < 0)
1258                 return r;
1259
1260         /* First try to create the machine path, but not the prefix */
1261         fn = strappend("/var/log/journal/", sd_id128_to_string(machine, ids));
1262         if (!fn)
1263                 return -ENOMEM;
1264         (void) mkdir(fn, 0755);
1265         free(fn);
1266
1267         /* The create the system journal file */
1268         fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1269         if (!fn)
1270                 return -ENOMEM;
1271
1272         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1273         free(fn);
1274
1275         if (r >= 0) {
1276                 s->system_journal->metrics = s->metrics;
1277                 s->system_journal->compress = s->compress;
1278
1279                 fix_perms(s->system_journal, 0);
1280                 return r;
1281         }
1282
1283         if (r < 0 && r != -ENOENT) {
1284                 log_error("Failed to open system journal: %s", strerror(-r));
1285                 return r;
1286         }
1287
1288         /* /var didn't work, so try /run, but this time we
1289          * create the prefix too */
1290         fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1291         if (!fn)
1292                 return -ENOMEM;
1293
1294         (void) mkdir_parents(fn, 0755);
1295         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1296         free(fn);
1297
1298         if (r < 0) {
1299                 log_error("Failed to open runtime journal: %s", strerror(-r));
1300                 return r;
1301         }
1302
1303         s->runtime_journal->metrics = s->metrics;
1304         s->runtime_journal->compress = s->compress;
1305
1306         fix_perms(s->runtime_journal, 0);
1307         return r;
1308 }
1309
1310 static int open_syslog_socket(Server *s) {
1311         union sockaddr_union sa;
1312         int one, r;
1313         struct epoll_event ev;
1314
1315         assert(s);
1316
1317         if (s->syslog_fd < 0) {
1318
1319                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1320                 if (s->syslog_fd < 0) {
1321                         log_error("socket() failed: %m");
1322                         return -errno;
1323                 }
1324
1325                 zero(sa);
1326                 sa.un.sun_family = AF_UNIX;
1327                 strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1328
1329                 unlink(sa.un.sun_path);
1330
1331                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1332                 if (r < 0) {
1333                         log_error("bind() failed: %m");
1334                         return -errno;
1335                 }
1336
1337                 chmod(sa.un.sun_path, 0666);
1338         }
1339
1340         one = 1;
1341         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1342         if (r < 0) {
1343                 log_error("SO_PASSCRED failed: %m");
1344                 return -errno;
1345         }
1346
1347         one = 1;
1348         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1349         if (r < 0) {
1350                 log_error("SO_TIMESTAMP failed: %m");
1351                 return -errno;
1352         }
1353
1354         zero(ev);
1355         ev.events = EPOLLIN;
1356         ev.data.fd = s->syslog_fd;
1357         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1358                 log_error("Failed to add syslog server fd to epoll object: %m");
1359                 return -errno;
1360         }
1361
1362         return 0;
1363 }
1364
1365 static int open_native_socket(Server*s) {
1366         union sockaddr_union sa;
1367         int one, r;
1368         struct epoll_event ev;
1369
1370         assert(s);
1371
1372         if (s->native_fd < 0) {
1373
1374                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1375                 if (s->native_fd < 0) {
1376                         log_error("socket() failed: %m");
1377                         return -errno;
1378                 }
1379
1380                 zero(sa);
1381                 sa.un.sun_family = AF_UNIX;
1382                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1383
1384                 unlink(sa.un.sun_path);
1385
1386                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1387                 if (r < 0) {
1388                         log_error("bind() failed: %m");
1389                         return -errno;
1390                 }
1391
1392                 chmod(sa.un.sun_path, 0666);
1393         }
1394
1395         one = 1;
1396         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1397         if (r < 0) {
1398                 log_error("SO_PASSCRED failed: %m");
1399                 return -errno;
1400         }
1401
1402         one = 1;
1403         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1404         if (r < 0) {
1405                 log_error("SO_TIMESTAMP failed: %m");
1406                 return -errno;
1407         }
1408
1409         zero(ev);
1410         ev.events = EPOLLIN;
1411         ev.data.fd = s->native_fd;
1412         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1413                 log_error("Failed to add native server fd to epoll object: %m");
1414                 return -errno;
1415         }
1416
1417         return 0;
1418 }
1419
1420 static int open_stdout_socket(Server *s) {
1421         union sockaddr_union sa;
1422         int r;
1423         struct epoll_event ev;
1424
1425         assert(s);
1426
1427         if (s->stdout_fd < 0) {
1428
1429                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1430                 if (s->stdout_fd < 0) {
1431                         log_error("socket() failed: %m");
1432                         return -errno;
1433                 }
1434
1435                 zero(sa);
1436                 sa.un.sun_family = AF_UNIX;
1437                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1438
1439                 unlink(sa.un.sun_path);
1440
1441                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1442                 if (r < 0) {
1443                         log_error("bind() failed: %m");
1444                         return -errno;
1445                 }
1446
1447                 chmod(sa.un.sun_path, 0666);
1448
1449                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1450                         log_error("liste() failed: %m");
1451                         return -errno;
1452                 }
1453         }
1454
1455         zero(ev);
1456         ev.events = EPOLLIN;
1457         ev.data.fd = s->stdout_fd;
1458         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1459                 log_error("Failed to add stdout server fd to epoll object: %m");
1460                 return -errno;
1461         }
1462
1463         return 0;
1464 }
1465
1466 static int open_signalfd(Server *s) {
1467         sigset_t mask;
1468         struct epoll_event ev;
1469
1470         assert(s);
1471
1472         assert_se(sigemptyset(&mask) == 0);
1473         sigset_add_many(&mask, SIGINT, SIGTERM, -1);
1474         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1475
1476         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1477         if (s->signal_fd < 0) {
1478                 log_error("signalfd(): %m");
1479                 return -errno;
1480         }
1481
1482         zero(ev);
1483         ev.events = EPOLLIN;
1484         ev.data.fd = s->signal_fd;
1485
1486         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1487                 log_error("epoll_ctl(): %m");
1488                 return -errno;
1489         }
1490
1491         return 0;
1492 }
1493
1494 static int server_init(Server *s) {
1495         int n, r, fd;
1496
1497         assert(s);
1498
1499         zero(*s);
1500         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1501         s->metrics.max_size = DEFAULT_MAX_SIZE;
1502         s->metrics.min_size = DEFAULT_MIN_SIZE;
1503         s->metrics.keep_free = DEFAULT_KEEP_FREE;
1504         s->max_use = DEFAULT_MAX_USE;
1505         s->compress = true;
1506
1507         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1508         if (!s->user_journals) {
1509                 log_error("Out of memory.");
1510                 return -ENOMEM;
1511         }
1512
1513         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1514         if (s->epoll_fd < 0) {
1515                 log_error("Failed to create epoll object: %m");
1516                 return -errno;
1517         }
1518
1519         n = sd_listen_fds(true);
1520         if (n < 0) {
1521                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1522                 return n;
1523         }
1524
1525         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1526
1527                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1528
1529                         if (s->native_fd >= 0) {
1530                                 log_error("Too many native sockets passed.");
1531                                 return -EINVAL;
1532                         }
1533
1534                         s->native_fd = fd;
1535
1536                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1537
1538                         if (s->stdout_fd >= 0) {
1539                                 log_error("Too many stdout sockets passed.");
1540                                 return -EINVAL;
1541                         }
1542
1543                         s->stdout_fd = fd;
1544
1545                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1546
1547                         if (s->syslog_fd >= 0) {
1548                                 log_error("Too many /dev/log sockets passed.");
1549                                 return -EINVAL;
1550                         }
1551
1552                         s->syslog_fd = fd;
1553
1554                 } else {
1555                         log_error("Unknown socket passed.");
1556                         return -EINVAL;
1557                 }
1558         }
1559
1560         r = open_syslog_socket(s);
1561         if (r < 0)
1562                 return r;
1563
1564         r = open_native_socket(s);
1565         if (r < 0)
1566                 return r;
1567
1568         r = open_stdout_socket(s);
1569         if (r < 0)
1570                 return r;
1571
1572         r = system_journal_open(s);
1573         if (r < 0)
1574                 return r;
1575
1576         r = open_signalfd(s);
1577         if (r < 0)
1578                 return r;
1579
1580         s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST);
1581         if (!s->rate_limit)
1582                 return -ENOMEM;
1583
1584         return 0;
1585 }
1586
1587 static void server_done(Server *s) {
1588         JournalFile *f;
1589         assert(s);
1590
1591         while (s->stdout_streams)
1592                 stdout_stream_free(s->stdout_streams);
1593
1594         if (s->system_journal)
1595                 journal_file_close(s->system_journal);
1596
1597         if (s->runtime_journal)
1598                 journal_file_close(s->runtime_journal);
1599
1600         while ((f = hashmap_steal_first(s->user_journals)))
1601                 journal_file_close(f);
1602
1603         hashmap_free(s->user_journals);
1604
1605         if (s->epoll_fd >= 0)
1606                 close_nointr_nofail(s->epoll_fd);
1607
1608         if (s->signal_fd >= 0)
1609                 close_nointr_nofail(s->signal_fd);
1610
1611         if (s->syslog_fd >= 0)
1612                 close_nointr_nofail(s->syslog_fd);
1613
1614         if (s->native_fd >= 0)
1615                 close_nointr_nofail(s->native_fd);
1616
1617         if (s->stdout_fd >= 0)
1618                 close_nointr_nofail(s->stdout_fd);
1619
1620         if (s->rate_limit)
1621                 journal_rate_limit_free(s->rate_limit);
1622 }
1623
1624 int main(int argc, char *argv[]) {
1625         Server server;
1626         int r;
1627
1628         /* if (getppid() != 1) { */
1629         /*         log_error("This program should be invoked by init only."); */
1630         /*         return EXIT_FAILURE; */
1631         /* } */
1632
1633         if (argc > 1) {
1634                 log_error("This program does not take arguments.");
1635                 return EXIT_FAILURE;
1636         }
1637
1638         log_set_target(LOG_TARGET_CONSOLE);
1639         log_set_max_level(LOG_DEBUG);
1640         log_parse_environment();
1641         log_open();
1642
1643         umask(0022);
1644
1645         r = server_init(&server);
1646         if (r < 0)
1647                 goto finish;
1648
1649         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1650
1651         sd_notify(false,
1652                   "READY=1\n"
1653                   "STATUS=Processing requests...");
1654
1655         for (;;) {
1656                 struct epoll_event event;
1657
1658                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1659                 if (r < 0) {
1660
1661                         if (errno == EINTR)
1662                                 continue;
1663
1664                         log_error("epoll_wait() failed: %m");
1665                         r = -errno;
1666                         goto finish;
1667                 } else if (r == 0)
1668                         break;
1669
1670                 r = process_event(&server, &event);
1671                 if (r < 0)
1672                         goto finish;
1673                 else if (r == 0)
1674                         break;
1675         }
1676
1677         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1678
1679 finish:
1680         sd_notify(false,
1681                   "STATUS=Shutting down...");
1682
1683         server_done(&server);
1684
1685         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1686 }