chiark / gitweb /
journald: implement sophisticated rate limiting
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43
44 #define USER_JOURNALS_MAX 1024
45 #define STDOUT_STREAMS_MAX 4096
46
47 typedef struct StdoutStream StdoutStream;
48
49 typedef struct Server {
50         int epoll_fd;
51         int signal_fd;
52         int syslog_fd;
53         int native_fd;
54         int stdout_fd;
55
56         JournalFile *runtime_journal;
57         JournalFile *system_journal;
58         Hashmap *user_journals;
59
60         uint64_t seqnum;
61
62         char *buffer;
63         size_t buffer_size;
64
65         JournalRateLimit *rate_limit;
66
67         JournalMetrics metrics;
68         uint64_t max_use;
69         bool compress;
70
71         LIST_HEAD(StdoutStream, stdout_streams);
72         unsigned n_stdout_streams;
73 } Server;
74
75 typedef enum StdoutStreamState {
76         STDOUT_STREAM_TAG,
77         STDOUT_STREAM_PRIORITY,
78         STDOUT_STREAM_PRIORITY_PREFIX,
79         STDOUT_STREAM_TEE_CONSOLE,
80         STDOUT_STREAM_RUNNING
81 } StdoutStreamState;
82
83 struct StdoutStream {
84         Server *server;
85         StdoutStreamState state;
86
87         int fd;
88
89         struct ucred ucred;
90
91         char *tag;
92         int priority;
93         bool priority_prefix:1;
94         bool tee_console:1;
95
96         char buffer[LINE_MAX+1];
97         size_t length;
98
99         LIST_FIELDS(StdoutStream, stdout_stream);
100 };
101
102 static uint64_t available_space(Server *s) {
103         char ids[33];
104         sd_id128_t machine;
105         char *p;
106         const char *f;
107         struct statvfs ss;
108         uint64_t sum = 0, avail = 0, ss_avail = 0;
109         int r;
110         DIR *d;
111
112         r = sd_id128_get_machine(&machine);
113         if (r < 0)
114                 return 0;
115
116         if (s->system_journal)
117                 f = "/var/log/journal/";
118         else
119                 f = "/run/log/journal/";
120
121         p = strappend(f, sd_id128_to_string(machine, ids));
122         if (!p)
123                 return 0;
124
125         d = opendir(p);
126         free(p);
127
128         if (!d)
129                 return 0;
130
131         if (fstatvfs(dirfd(d), &ss) < 0)
132                 goto finish;
133
134         for (;;) {
135                 struct stat st;
136                 struct dirent buf, *de;
137                 int k;
138
139                 k = readdir_r(d, &buf, &de);
140                 if (k != 0) {
141                         r = -k;
142                         goto finish;
143                 }
144
145                 if (!de)
146                         break;
147
148                 if (!dirent_is_file_with_suffix(de, ".journal"))
149                         continue;
150
151                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
152                         continue;
153
154                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
155         }
156
157         avail = sum >= s->max_use ? 0 : s->max_use - sum;
158
159         ss_avail = ss.f_bsize * ss.f_bavail;
160
161         ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
162
163         if (ss_avail < avail)
164                 avail = ss_avail;
165
166 finish:
167         closedir(d);
168
169         return avail;
170 }
171
172 static void fix_perms(JournalFile *f, uid_t uid) {
173         acl_t acl;
174         acl_entry_t entry;
175         acl_permset_t permset;
176         int r;
177
178         assert(f);
179
180         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
181         if (r < 0)
182                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
183
184         if (uid <= 0)
185                 return;
186
187         acl = acl_get_fd(f->fd);
188         if (!acl) {
189                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
190                 return;
191         }
192
193         r = acl_find_uid(acl, uid, &entry);
194         if (r <= 0) {
195
196                 if (acl_create_entry(&acl, &entry) < 0 ||
197                     acl_set_tag_type(entry, ACL_USER) < 0 ||
198                     acl_set_qualifier(entry, &uid) < 0) {
199                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
200                         goto finish;
201                 }
202         }
203
204         if (acl_get_permset(entry, &permset) < 0 ||
205             acl_add_perm(permset, ACL_READ) < 0 ||
206             acl_calc_mask(&acl) < 0) {
207                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
208                 goto finish;
209         }
210
211         if (acl_set_fd(f->fd, acl) < 0)
212                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
213
214 finish:
215         acl_free(acl);
216 }
217
218 static JournalFile* find_journal(Server *s, uid_t uid) {
219         char *p;
220         int r;
221         JournalFile *f;
222         char ids[33];
223         sd_id128_t machine;
224
225         assert(s);
226
227         /* We split up user logs only on /var, not on /run */
228         if (!s->system_journal)
229                 return s->runtime_journal;
230
231         if (uid <= 0)
232                 return s->system_journal;
233
234         r = sd_id128_get_machine(&machine);
235         if (r < 0)
236                 return s->system_journal;
237
238         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
239         if (f)
240                 return f;
241
242         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
243                 return s->system_journal;
244
245         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
246                 /* Too many open? Then let's close one */
247                 f = hashmap_steal_first(s->user_journals);
248                 assert(f);
249                 journal_file_close(f);
250         }
251
252         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
253         free(p);
254
255         if (r < 0)
256                 return s->system_journal;
257
258         fix_perms(f, uid);
259         f->metrics = s->metrics;
260         f->compress = s->compress;
261
262         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
263         if (r < 0) {
264                 journal_file_close(f);
265                 return s->system_journal;
266         }
267
268         return f;
269 }
270
271 static void server_vacuum(Server *s) {
272         Iterator i;
273         void *k;
274         char *p;
275         char ids[33];
276         sd_id128_t machine;
277         int r;
278         JournalFile *f;
279
280         log_info("Rotating...");
281
282         if (s->runtime_journal) {
283                 r = journal_file_rotate(&s->runtime_journal);
284                 if (r < 0)
285                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
286         }
287
288         if (s->system_journal) {
289                 r = journal_file_rotate(&s->system_journal);
290                 if (r < 0)
291                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
292         }
293
294         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
295                 r = journal_file_rotate(&f);
296                 if (r < 0)
297                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
298                 else
299                         hashmap_replace(s->user_journals, k, f);
300         }
301
302         log_info("Vacuuming...");
303
304         r = sd_id128_get_machine(&machine);
305         if (r < 0) {
306                 log_error("Failed to get machine ID: %s", strerror(-r));
307                 return;
308         }
309
310         if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
311                 log_error("Out of memory.");
312                 return;
313         }
314
315         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
316         if (r < 0 && r != -ENOENT)
317                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
318         free(p);
319
320         if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
321                 log_error("Out of memory.");
322                 return;
323         }
324
325         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
326         if (r < 0 && r != -ENOENT)
327                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
328         free(p);
329 }
330
331 static char *shortened_cgroup_path(pid_t pid) {
332         int r;
333         char *process_path, *init_path, *path;
334
335         assert(pid > 0);
336
337         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
338         if (r < 0)
339                 return NULL;
340
341         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
342         if (r < 0) {
343                 free(process_path);
344                 return NULL;
345         }
346
347         if (streq(init_path, "/"))
348                 init_path[0] = 0;
349
350         if (startswith(process_path, init_path))
351                 path = process_path + strlen(init_path);
352         else
353                 path = process_path;
354
355         free(init_path);
356
357         return path;
358 }
359
360 static void dispatch_message_real(Server *s,
361                              struct iovec *iovec, unsigned n, unsigned m,
362                              struct ucred *ucred,
363                              struct timeval *tv) {
364
365         char *pid = NULL, *uid = NULL, *gid = NULL,
366                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
367                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
368                 *audit_session = NULL, *audit_loginuid = NULL,
369                 *exe = NULL, *cgroup = NULL;
370
371         char idbuf[33];
372         sd_id128_t id;
373         int r;
374         char *t;
375         uid_t loginuid = 0, realuid = 0;
376         JournalFile *f;
377         bool vacuumed = false;
378
379         assert(s);
380         assert(iovec);
381         assert(n > 0);
382         assert(n + 13 <= m);
383
384         if (ucred) {
385                 uint32_t session;
386                 char *path;
387
388                 realuid = ucred->uid;
389
390                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
391                         IOVEC_SET_STRING(iovec[n++], pid);
392
393                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
394                         IOVEC_SET_STRING(iovec[n++], uid);
395
396                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
397                         IOVEC_SET_STRING(iovec[n++], gid);
398
399                 r = get_process_comm(ucred->pid, &t);
400                 if (r >= 0) {
401                         comm = strappend("_COMM=", t);
402                         if (comm)
403                                 IOVEC_SET_STRING(iovec[n++], comm);
404                         free(t);
405                 }
406
407                 r = get_process_exe(ucred->pid, &t);
408                 if (r >= 0) {
409                         exe = strappend("_EXE=", t);
410                         if (comm)
411                                 IOVEC_SET_STRING(iovec[n++], exe);
412                         free(t);
413                 }
414
415                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
416                 if (r >= 0) {
417                         cmdline = strappend("_CMDLINE=", t);
418                         if (cmdline)
419                                 IOVEC_SET_STRING(iovec[n++], cmdline);
420                         free(t);
421                 }
422
423                 r = audit_session_from_pid(ucred->pid, &session);
424                 if (r >= 0)
425                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
426                                 IOVEC_SET_STRING(iovec[n++], audit_session);
427
428                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
429                 if (r >= 0)
430                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
431                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
432
433                 path = shortened_cgroup_path(ucred->pid);
434                 if (path) {
435                         cgroup = strappend("_SYSTEMD_CGROUP=", path);
436                         if (cgroup)
437                                 IOVEC_SET_STRING(iovec[n++], cgroup);
438
439                         free(path);
440                 }
441         }
442
443         if (tv) {
444                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
445                              (unsigned long long) timeval_load(tv)) >= 0)
446                         IOVEC_SET_STRING(iovec[n++], source_time);
447         }
448
449         /* Note that strictly speaking storing the boot id here is
450          * redundant since the entry includes this in-line
451          * anyway. However, we need this indexed, too. */
452         r = sd_id128_get_boot(&id);
453         if (r >= 0)
454                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
455                         IOVEC_SET_STRING(iovec[n++], boot_id);
456
457         r = sd_id128_get_machine(&id);
458         if (r >= 0)
459                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
460                         IOVEC_SET_STRING(iovec[n++], machine_id);
461
462         t = gethostname_malloc();
463         if (t) {
464                 hostname = strappend("_HOSTNAME=", t);
465                 if (hostname)
466                         IOVEC_SET_STRING(iovec[n++], hostname);
467                 free(t);
468         }
469
470         assert(n <= m);
471
472 retry:
473         f = find_journal(s, realuid == 0 ? 0 : loginuid);
474         if (!f)
475                 log_warning("Dropping message, as we can't find a place to store the data.");
476         else {
477                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
478
479                 if (r == -E2BIG && !vacuumed) {
480                         log_info("Allocation limit reached.");
481
482                         server_vacuum(s);
483                         vacuumed = true;
484
485                         log_info("Retrying write.");
486                         goto retry;
487                 }
488
489                 if (r < 0)
490                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
491         }
492
493         free(pid);
494         free(uid);
495         free(gid);
496         free(comm);
497         free(exe);
498         free(cmdline);
499         free(source_time);
500         free(boot_id);
501         free(machine_id);
502         free(hostname);
503         free(audit_session);
504         free(audit_loginuid);
505         free(cgroup);
506 }
507
508 static void dispatch_message(Server *s,
509                              struct iovec *iovec, unsigned n, unsigned m,
510                              struct ucred *ucred,
511                              struct timeval *tv,
512                              int priority) {
513         int rl;
514         char *path, *c;
515
516         assert(s);
517         assert(iovec || n == 0);
518
519         if (n == 0)
520                 return;
521
522         if (!ucred)
523                 goto finish;
524
525         path = shortened_cgroup_path(ucred->pid);
526         if (!path)
527                 goto finish;
528
529         /* example: /user/lennart/3/foobar
530          *          /system/dbus.service/foobar
531          *
532          * So let's cut of everything past the third /, since that is
533          * wher user directories start */
534
535         c = strchr(path, '/');
536         if (c) {
537                 c = strchr(c+1, '/');
538                 if (c) {
539                         c = strchr(c+1, '/');
540                         if (c)
541                                 *c = 0;
542                 }
543         }
544
545         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
546
547         if (rl == 0) {
548                 free(path);
549                 return;
550         }
551
552         if (rl > 1) {
553                 int j = 0;
554                 char suppress_message[LINE_MAX];
555                 struct iovec suppress_iovec[15];
556
557                 /* Write a suppression message if we suppressed something */
558
559                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
560                 char_array_0(suppress_message);
561
562                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
563                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
564
565                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
566         }
567
568         free(path);
569
570 finish:
571         dispatch_message_real(s, iovec, n, m, ucred, tv);
572 }
573
574 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
575         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
576         struct iovec iovec[16];
577         unsigned n = 0;
578         int priority = LOG_USER | LOG_INFO;
579
580         assert(s);
581         assert(buf);
582
583         parse_syslog_priority((char**) &buf, &priority);
584         skip_syslog_date((char**) &buf);
585
586         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
587                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
588
589         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
590                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
591
592         message = strappend("MESSAGE=", buf);
593         if (message)
594                 IOVEC_SET_STRING(iovec[n++], message);
595
596         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
597
598         free(message);
599         free(syslog_facility);
600         free(syslog_priority);
601 }
602
603 static bool valid_user_field(const char *p, size_t l) {
604         const char *a;
605
606         /* We kinda enforce POSIX syntax recommendations for
607            environment variables here, but make a couple of additional
608            requirements.
609
610            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
611
612         /* No empty field names */
613         if (l <= 0)
614                 return false;
615
616         /* Don't allow names longer than 64 chars */
617         if (l > 64)
618                 return false;
619
620         /* Variables starting with an underscore are protected */
621         if (p[0] == '_')
622                 return false;
623
624         /* Don't allow digits as first character */
625         if (p[0] >= '0' && p[0] <= '9')
626                 return false;
627
628         /* Only allow A-Z0-9 and '_' */
629         for (a = p; a < p + l; a++)
630                 if (!((*a >= 'A' && *a <= 'Z') ||
631                       (*a >= '0' && *a <= '9') ||
632                       *a == '_'))
633                         return false;
634
635         return true;
636 }
637
638 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
639         struct iovec *iovec = NULL;
640         unsigned n = 0, m = 0, j;
641         const char *p;
642         size_t remaining;
643         int priority = LOG_INFO;
644
645         assert(s);
646         assert(buffer || n == 0);
647
648         p = buffer;
649         remaining = buffer_size;
650
651         while (remaining > 0) {
652                 const char *e, *q;
653
654                 e = memchr(p, '\n', remaining);
655
656                 if (!e) {
657                         /* Trailing noise, let's ignore it, and flush what we collected */
658                         log_debug("Received message with trailing noise, ignoring.");
659                         break;
660                 }
661
662                 if (e == p) {
663                         /* Entry separator */
664                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
665                         n = 0;
666                         priority = LOG_INFO;
667
668                         p++;
669                         remaining--;
670                         continue;
671                 }
672
673                 if (*p == '.' || *p == '#') {
674                         /* Ignore control commands for now, and
675                          * comments too. */
676                         remaining -= (e - p) + 1;
677                         p = e + 1;
678                         continue;
679                 }
680
681                 /* A property follows */
682
683                 if (n+13 >= m) {
684                         struct iovec *c;
685                         unsigned u;
686
687                         u = MAX((n+13U) * 2U, 4U);
688                         c = realloc(iovec, u * sizeof(struct iovec));
689                         if (!c) {
690                                 log_error("Out of memory");
691                                 break;
692                         }
693
694                         iovec = c;
695                         m = u;
696                 }
697
698                 q = memchr(p, '=', e - p);
699                 if (q) {
700                         if (valid_user_field(p, q - p)) {
701                                 /* If the field name starts with an
702                                  * underscore, skip the variable,
703                                  * since that indidates a trusted
704                                  * field */
705                                 iovec[n].iov_base = (char*) p;
706                                 iovec[n].iov_len = e - p;
707                                 n++;
708
709                                 /* We need to determine the priority
710                                  * of this entry for the rate limiting
711                                  * logic */
712                                 if (e - p == 10 &&
713                                     memcmp(p, "PRIORITY=", 10) == 0 &&
714                                     p[10] >= '0' &&
715                                     p[10] <= '9')
716                                         priority = p[10] - '0';
717                         }
718
719                         remaining -= (e - p) + 1;
720                         p = e + 1;
721                         continue;
722                 } else {
723                         uint64_t l;
724                         char *k;
725
726                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
727                                 log_debug("Failed to parse message, ignoring.");
728                                 break;
729                         }
730
731                         memcpy(&l, e + 1, sizeof(uint64_t));
732                         l = le64toh(l);
733
734                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
735                             e[1+sizeof(uint64_t)+l] != '\n') {
736                                 log_debug("Failed to parse message, ignoring.");
737                                 break;
738                         }
739
740                         k = malloc((e - p) + 1 + l);
741                         if (!k) {
742                                 log_error("Out of memory");
743                                 break;
744                         }
745
746                         memcpy(k, p, e - p);
747                         k[e - p] = '=';
748                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
749
750                         if (valid_user_field(p, e - p)) {
751                                 iovec[n].iov_base = k;
752                                 iovec[n].iov_len = (e - p) + 1 + l;
753                                 n++;
754                         } else
755                                 free(k);
756
757                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
758                         p = e + 1 + sizeof(uint64_t) + l + 1;
759                 }
760         }
761
762         dispatch_message(s, iovec, n, m, ucred, tv, priority);
763
764         for (j = 0; j < n; j++)
765                 if (iovec[j].iov_base < buffer ||
766                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
767                         free(iovec[j].iov_base);
768 }
769
770 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
771         struct iovec iovec[15];
772         char *message = NULL, *syslog_priority = NULL;
773         unsigned n = 0;
774         size_t tag_len;
775         int priority;
776
777         assert(s);
778         assert(p);
779
780         priority = s->priority;
781
782         if (s->priority_prefix &&
783             l > 3 &&
784             p[0] == '<' &&
785             p[1] >= '0' && p[1] <= '7' &&
786             p[2] == '>') {
787
788                 priority = p[1] - '0';
789                 p += 3;
790                 l -= 3;
791         }
792
793         if (l <= 0)
794                 return 0;
795
796         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
797                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
798
799         tag_len = s->tag ? strlen(s->tag) + 2: 0;
800         message = malloc(8 + tag_len + l);
801         if (message) {
802                 memcpy(message, "MESSAGE=", 8);
803
804                 if (s->tag) {
805                         memcpy(message+8, s->tag, tag_len-2);
806                         memcpy(message+8+tag_len-2, ": ", 2);
807                 }
808
809                 memcpy(message+8+tag_len, p, l);
810                 iovec[n].iov_base = message;
811                 iovec[n].iov_len = 8+tag_len+l;
812                 n++;
813         }
814
815         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
816
817         if (s->tee_console) {
818                 int console;
819
820                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
821                 if (console >= 0) {
822                         n = 0;
823                         if (s->tag) {
824                                 IOVEC_SET_STRING(iovec[n++], s->tag);
825                                 IOVEC_SET_STRING(iovec[n++], ": ");
826                         }
827
828                         iovec[n].iov_base = (void*) p;
829                         iovec[n].iov_len = l;
830                         n++;
831
832                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
833
834                         writev(console, iovec, n);
835                 }
836         }
837
838         free(message);
839         free(syslog_priority);
840
841         return 0;
842 }
843
844 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
845         assert(s);
846         assert(p);
847
848         while (l > 0 && strchr(WHITESPACE, *p)) {
849                 l--;
850                 p++;
851         }
852
853         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
854                 l--;
855
856         switch (s->state) {
857
858         case STDOUT_STREAM_TAG:
859
860                 if (l > 0) {
861                         s->tag = strndup(p, l);
862                         if (!s->tag) {
863                                 log_error("Out of memory");
864                                 return -EINVAL;
865                         }
866                 }
867
868                 s->state = STDOUT_STREAM_PRIORITY;
869                 return 0;
870
871         case STDOUT_STREAM_PRIORITY:
872                 if (l != 1 || *p < '0' || *p > '7') {
873                         log_warning("Failed to parse log priority line.");
874                         return -EINVAL;
875                 }
876
877                 s->priority = *p - '0';
878                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
879                 return 0;
880
881         case STDOUT_STREAM_PRIORITY_PREFIX:
882                 if (l != 1 || *p < '0' || *p > '1') {
883                         log_warning("Failed to parse priority prefix line.");
884                         return -EINVAL;
885                 }
886
887                 s->priority_prefix = *p - '0';
888                 s->state = STDOUT_STREAM_TEE_CONSOLE;
889                 return 0;
890
891         case STDOUT_STREAM_TEE_CONSOLE:
892                 if (l != 1 || *p < '0' || *p > '1') {
893                         log_warning("Failed to parse tee to console line.");
894                         return -EINVAL;
895                 }
896
897                 s->tee_console = *p - '0';
898                 s->state = STDOUT_STREAM_RUNNING;
899                 return 0;
900
901         case STDOUT_STREAM_RUNNING:
902                 return stdout_stream_log(s, p, l);
903         }
904
905         assert_not_reached("Unknown stream state");
906 }
907
908 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
909         char *p;
910         size_t remaining;
911         int r;
912
913         assert(s);
914
915         p = s->buffer;
916         remaining = s->length;
917         for (;;) {
918                 char *end;
919                 size_t skip;
920
921                 end = memchr(p, '\n', remaining);
922                 if (!end) {
923                         if (remaining >= LINE_MAX) {
924                                 end = p + LINE_MAX;
925                                 skip = LINE_MAX;
926                         } else
927                                 break;
928                 } else
929                         skip = end - p + 1;
930
931                 r = stdout_stream_line(s, p, end - p);
932                 if (r < 0)
933                         return r;
934
935                 remaining -= skip;
936                 p += skip;
937         }
938
939         if (force_flush && remaining > 0) {
940                 r = stdout_stream_line(s, p, remaining);
941                 if (r < 0)
942                         return r;
943
944                 p += remaining;
945                 remaining = 0;
946         }
947
948         if (p > s->buffer) {
949                 memmove(s->buffer, p, remaining);
950                 s->length = remaining;
951         }
952
953         return 0;
954 }
955
956 static int stdout_stream_process(StdoutStream *s) {
957         ssize_t l;
958         int r;
959
960         assert(s);
961
962         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
963         if (l < 0) {
964
965                 if (errno == EAGAIN)
966                         return 0;
967
968                 log_warning("Failed to read from stream: %m");
969                 return -errno;
970         }
971
972         if (l == 0) {
973                 r = stdout_stream_scan(s, true);
974                 if (r < 0)
975                         return r;
976
977                 return 0;
978         }
979
980         s->length += l;
981         r = stdout_stream_scan(s, false);
982         if (r < 0)
983                 return r;
984
985         return 1;
986
987 }
988
989 static void stdout_stream_free(StdoutStream *s) {
990         assert(s);
991
992         if (s->server) {
993                 assert(s->server->n_stdout_streams > 0);
994                 s->server->n_stdout_streams --;
995                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
996         }
997
998         if (s->fd >= 0) {
999                 if (s->server)
1000                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1001
1002                 close_nointr_nofail(s->fd);
1003         }
1004
1005         free(s->tag);
1006         free(s);
1007 }
1008
1009 static int stdout_stream_new(Server *s) {
1010         StdoutStream *stream;
1011         int fd, r;
1012         socklen_t len;
1013         struct epoll_event ev;
1014
1015         assert(s);
1016
1017         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1018         if (fd < 0) {
1019                 if (errno == EAGAIN)
1020                         return 0;
1021
1022                 log_error("Failed to accept stdout connection: %m");
1023                 return -errno;
1024         }
1025
1026         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1027                 log_warning("Too many stdout streams, refusing connection.");
1028                 close_nointr_nofail(fd);
1029                 return 0;
1030         }
1031
1032         stream = new0(StdoutStream, 1);
1033         if (!stream) {
1034                 log_error("Out of memory.");
1035                 close_nointr_nofail(fd);
1036                 return -ENOMEM;
1037         }
1038
1039         stream->fd = fd;
1040
1041         len = sizeof(stream->ucred);
1042         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1043                 log_error("Failed to determine peer credentials: %m");
1044                 r = -errno;
1045                 goto fail;
1046         }
1047
1048         if (shutdown(fd, SHUT_WR) < 0) {
1049                 log_error("Failed to shutdown writing side of socket: %m");
1050                 r = -errno;
1051                 goto fail;
1052         }
1053
1054         zero(ev);
1055         ev.data.ptr = stream;
1056         ev.events = EPOLLIN;
1057         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1058                 log_error("Failed to add stream to event loop: %m");
1059                 r = -errno;
1060                 goto fail;
1061         }
1062
1063         stream->server = s;
1064         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1065         s->n_stdout_streams ++;
1066
1067         return 0;
1068
1069 fail:
1070         stdout_stream_free(stream);
1071         return r;
1072 }
1073
1074 static int process_event(Server *s, struct epoll_event *ev) {
1075         assert(s);
1076
1077         if (ev->data.fd == s->signal_fd) {
1078                 struct signalfd_siginfo sfsi;
1079                 ssize_t n;
1080
1081                 if (ev->events != EPOLLIN) {
1082                         log_info("Got invalid event from epoll.");
1083                         return -EIO;
1084                 }
1085
1086                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1087                 if (n != sizeof(sfsi)) {
1088
1089                         if (n >= 0)
1090                                 return -EIO;
1091
1092                         if (errno == EINTR || errno == EAGAIN)
1093                                 return 0;
1094
1095                         return -errno;
1096                 }
1097
1098                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1099                 return 0;
1100
1101         } else if (ev->data.fd == s->native_fd ||
1102                    ev->data.fd == s->syslog_fd) {
1103
1104                 if (ev->events != EPOLLIN) {
1105                         log_info("Got invalid event from epoll.");
1106                         return -EIO;
1107                 }
1108
1109                 for (;;) {
1110                         struct msghdr msghdr;
1111                         struct iovec iovec;
1112                         struct ucred *ucred = NULL;
1113                         struct timeval *tv = NULL;
1114                         struct cmsghdr *cmsg;
1115                         union {
1116                                 struct cmsghdr cmsghdr;
1117                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1118                                             CMSG_SPACE(sizeof(struct timeval))];
1119                         } control;
1120                         ssize_t n;
1121                         int v;
1122
1123                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1124                                 log_error("SIOCINQ failed: %m");
1125                                 return -errno;
1126                         }
1127
1128                         if (v <= 0)
1129                                 return 1;
1130
1131                         if (s->buffer_size < (size_t) v) {
1132                                 void *b;
1133                                 size_t l;
1134
1135                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1136                                 b = realloc(s->buffer, l+1);
1137
1138                                 if (!b) {
1139                                         log_error("Couldn't increase buffer.");
1140                                         return -ENOMEM;
1141                                 }
1142
1143                                 s->buffer_size = l;
1144                                 s->buffer = b;
1145                         }
1146
1147                         zero(iovec);
1148                         iovec.iov_base = s->buffer;
1149                         iovec.iov_len = s->buffer_size;
1150
1151                         zero(control);
1152                         zero(msghdr);
1153                         msghdr.msg_iov = &iovec;
1154                         msghdr.msg_iovlen = 1;
1155                         msghdr.msg_control = &control;
1156                         msghdr.msg_controllen = sizeof(control);
1157
1158                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1159                         if (n < 0) {
1160
1161                                 if (errno == EINTR || errno == EAGAIN)
1162                                         return 1;
1163
1164                                 log_error("recvmsg() failed: %m");
1165                                 return -errno;
1166                         }
1167
1168                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1169
1170                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1171                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1172                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1173                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1174                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1175                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1176                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1177                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1178                         }
1179
1180                         if (ev->data.fd == s->syslog_fd) {
1181                                 char *e;
1182
1183                                 e = memchr(s->buffer, '\n', n);
1184                                 if (e)
1185                                         *e = 0;
1186                                 else
1187                                         s->buffer[n] = 0;
1188
1189                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1190                         } else
1191                                 process_native_message(s, s->buffer, n, ucred, tv);
1192                 }
1193
1194                 return 1;
1195
1196         } else if (ev->data.fd == s->stdout_fd) {
1197
1198                 if (ev->events != EPOLLIN) {
1199                         log_info("Got invalid event from epoll.");
1200                         return -EIO;
1201                 }
1202
1203                 stdout_stream_new(s);
1204                 return 1;
1205
1206         } else {
1207                 StdoutStream *stream;
1208
1209                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1210                         log_info("Got invalid event from epoll.");
1211                         return -EIO;
1212                 }
1213
1214                 /* If it is none of the well-known fds, it must be an
1215                  * stdout stream fd. Note that this is a bit ugly here
1216                  * (since we rely that none of the well-known fds
1217                  * could be interpreted as pointer), but nonetheless
1218                  * safe, since the well-known fds would never get an
1219                  * fd > 4096, i.e. beyond the first memory page */
1220
1221                 stream = ev->data.ptr;
1222
1223                 if (stdout_stream_process(stream) <= 0)
1224                         stdout_stream_free(stream);
1225
1226                 return 1;
1227         }
1228
1229         log_error("Unknown event.");
1230         return 0;
1231 }
1232
1233 static int system_journal_open(Server *s) {
1234         int r;
1235         char *fn;
1236         sd_id128_t machine;
1237         char ids[33];
1238
1239         r = sd_id128_get_machine(&machine);
1240         if (r < 0)
1241                 return r;
1242
1243         /* First try to create the machine path, but not the prefix */
1244         fn = strappend("/var/log/journal/", sd_id128_to_string(machine, ids));
1245         if (!fn)
1246                 return -ENOMEM;
1247         (void) mkdir(fn, 0755);
1248         free(fn);
1249
1250         /* The create the system journal file */
1251         fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1252         if (!fn)
1253                 return -ENOMEM;
1254
1255         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1256         free(fn);
1257
1258         if (r >= 0) {
1259                 s->system_journal->metrics = s->metrics;
1260                 s->system_journal->compress = s->compress;
1261
1262                 fix_perms(s->system_journal, 0);
1263                 return r;
1264         }
1265
1266         if (r < 0 && r != -ENOENT) {
1267                 log_error("Failed to open system journal: %s", strerror(-r));
1268                 return r;
1269         }
1270
1271         /* /var didn't work, so try /run, but this time we
1272          * create the prefix too */
1273         fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1274         if (!fn)
1275                 return -ENOMEM;
1276
1277         (void) mkdir_parents(fn, 0755);
1278         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1279         free(fn);
1280
1281         if (r < 0) {
1282                 log_error("Failed to open runtime journal: %s", strerror(-r));
1283                 return r;
1284         }
1285
1286         s->runtime_journal->metrics = s->metrics;
1287         s->runtime_journal->compress = s->compress;
1288
1289         fix_perms(s->runtime_journal, 0);
1290         return r;
1291 }
1292
1293 static int open_syslog_socket(Server *s) {
1294         union sockaddr_union sa;
1295         int one, r;
1296         struct epoll_event ev;
1297
1298         assert(s);
1299
1300         if (s->syslog_fd < 0) {
1301
1302                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1303                 if (s->syslog_fd < 0) {
1304                         log_error("socket() failed: %m");
1305                         return -errno;
1306                 }
1307
1308                 zero(sa);
1309                 sa.un.sun_family = AF_UNIX;
1310                 strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1311
1312                 unlink(sa.un.sun_path);
1313
1314                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1315                 if (r < 0) {
1316                         log_error("bind() failed: %m");
1317                         return -errno;
1318                 }
1319
1320                 chmod(sa.un.sun_path, 0666);
1321         }
1322
1323         one = 1;
1324         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1325         if (r < 0) {
1326                 log_error("SO_PASSCRED failed: %m");
1327                 return -errno;
1328         }
1329
1330         one = 1;
1331         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1332         if (r < 0) {
1333                 log_error("SO_TIMESTAMP failed: %m");
1334                 return -errno;
1335         }
1336
1337         zero(ev);
1338         ev.events = EPOLLIN;
1339         ev.data.fd = s->syslog_fd;
1340         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1341                 log_error("Failed to add syslog server fd to epoll object: %m");
1342                 return -errno;
1343         }
1344
1345         return 0;
1346 }
1347
1348 static int open_native_socket(Server*s) {
1349         union sockaddr_union sa;
1350         int one, r;
1351         struct epoll_event ev;
1352
1353         assert(s);
1354
1355         if (s->native_fd < 0) {
1356
1357                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1358                 if (s->native_fd < 0) {
1359                         log_error("socket() failed: %m");
1360                         return -errno;
1361                 }
1362
1363                 zero(sa);
1364                 sa.un.sun_family = AF_UNIX;
1365                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1366
1367                 unlink(sa.un.sun_path);
1368
1369                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1370                 if (r < 0) {
1371                         log_error("bind() failed: %m");
1372                         return -errno;
1373                 }
1374
1375                 chmod(sa.un.sun_path, 0666);
1376         }
1377
1378         one = 1;
1379         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1380         if (r < 0) {
1381                 log_error("SO_PASSCRED failed: %m");
1382                 return -errno;
1383         }
1384
1385         one = 1;
1386         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1387         if (r < 0) {
1388                 log_error("SO_TIMESTAMP failed: %m");
1389                 return -errno;
1390         }
1391
1392         zero(ev);
1393         ev.events = EPOLLIN;
1394         ev.data.fd = s->native_fd;
1395         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1396                 log_error("Failed to add native server fd to epoll object: %m");
1397                 return -errno;
1398         }
1399
1400         return 0;
1401 }
1402
1403 static int open_stdout_socket(Server *s) {
1404         union sockaddr_union sa;
1405         int r;
1406         struct epoll_event ev;
1407
1408         assert(s);
1409
1410         if (s->stdout_fd < 0) {
1411
1412                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1413                 if (s->stdout_fd < 0) {
1414                         log_error("socket() failed: %m");
1415                         return -errno;
1416                 }
1417
1418                 zero(sa);
1419                 sa.un.sun_family = AF_UNIX;
1420                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1421
1422                 unlink(sa.un.sun_path);
1423
1424                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1425                 if (r < 0) {
1426                         log_error("bind() failed: %m");
1427                         return -errno;
1428                 }
1429
1430                 chmod(sa.un.sun_path, 0666);
1431
1432                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1433                         log_error("liste() failed: %m");
1434                         return -errno;
1435                 }
1436         }
1437
1438         zero(ev);
1439         ev.events = EPOLLIN;
1440         ev.data.fd = s->stdout_fd;
1441         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1442                 log_error("Failed to add stdout server fd to epoll object: %m");
1443                 return -errno;
1444         }
1445
1446         return 0;
1447 }
1448
1449 static int open_signalfd(Server *s) {
1450         sigset_t mask;
1451         struct epoll_event ev;
1452
1453         assert(s);
1454
1455         assert_se(sigemptyset(&mask) == 0);
1456         sigset_add_many(&mask, SIGINT, SIGTERM, -1);
1457         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1458
1459         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1460         if (s->signal_fd < 0) {
1461                 log_error("signalfd(): %m");
1462                 return -errno;
1463         }
1464
1465         zero(ev);
1466         ev.events = EPOLLIN;
1467         ev.data.fd = s->signal_fd;
1468
1469         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1470                 log_error("epoll_ctl(): %m");
1471                 return -errno;
1472         }
1473
1474         return 0;
1475 }
1476
1477 static int server_init(Server *s) {
1478         int n, r, fd;
1479
1480         assert(s);
1481
1482         zero(*s);
1483         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1484         s->metrics.max_size = DEFAULT_MAX_SIZE;
1485         s->metrics.min_size = DEFAULT_MIN_SIZE;
1486         s->metrics.keep_free = DEFAULT_KEEP_FREE;
1487         s->max_use = DEFAULT_MAX_USE;
1488         s->compress = true;
1489
1490         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1491         if (!s->user_journals) {
1492                 log_error("Out of memory.");
1493                 return -ENOMEM;
1494         }
1495
1496         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1497         if (s->epoll_fd < 0) {
1498                 log_error("Failed to create epoll object: %m");
1499                 return -errno;
1500         }
1501
1502         n = sd_listen_fds(true);
1503         if (n < 0) {
1504                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1505                 return n;
1506         }
1507
1508         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1509
1510                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1511
1512                         if (s->native_fd >= 0) {
1513                                 log_error("Too many native sockets passed.");
1514                                 return -EINVAL;
1515                         }
1516
1517                         s->native_fd = fd;
1518
1519                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1520
1521                         if (s->stdout_fd >= 0) {
1522                                 log_error("Too many stdout sockets passed.");
1523                                 return -EINVAL;
1524                         }
1525
1526                         s->stdout_fd = fd;
1527
1528                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1529
1530                         if (s->syslog_fd >= 0) {
1531                                 log_error("Too many /dev/log sockets passed.");
1532                                 return -EINVAL;
1533                         }
1534
1535                         s->syslog_fd = fd;
1536
1537                 } else {
1538                         log_error("Unknown socket passed.");
1539                         return -EINVAL;
1540                 }
1541         }
1542
1543         r = open_syslog_socket(s);
1544         if (r < 0)
1545                 return r;
1546
1547         r = open_native_socket(s);
1548         if (r < 0)
1549                 return r;
1550
1551         r = open_stdout_socket(s);
1552         if (r < 0)
1553                 return r;
1554
1555         r = system_journal_open(s);
1556         if (r < 0)
1557                 return r;
1558
1559         r = open_signalfd(s);
1560         if (r < 0)
1561                 return r;
1562
1563         s->rate_limit = journal_rate_limit_new(10*USEC_PER_SEC, 2);
1564         if (!s->rate_limit)
1565                 return -ENOMEM;
1566
1567         return 0;
1568 }
1569
1570 static void server_done(Server *s) {
1571         JournalFile *f;
1572         assert(s);
1573
1574         while (s->stdout_streams)
1575                 stdout_stream_free(s->stdout_streams);
1576
1577         if (s->system_journal)
1578                 journal_file_close(s->system_journal);
1579
1580         if (s->runtime_journal)
1581                 journal_file_close(s->runtime_journal);
1582
1583         while ((f = hashmap_steal_first(s->user_journals)))
1584                 journal_file_close(f);
1585
1586         hashmap_free(s->user_journals);
1587
1588         if (s->epoll_fd >= 0)
1589                 close_nointr_nofail(s->epoll_fd);
1590
1591         if (s->signal_fd >= 0)
1592                 close_nointr_nofail(s->signal_fd);
1593
1594         if (s->syslog_fd >= 0)
1595                 close_nointr_nofail(s->syslog_fd);
1596
1597         if (s->native_fd >= 0)
1598                 close_nointr_nofail(s->native_fd);
1599
1600         if (s->stdout_fd >= 0)
1601                 close_nointr_nofail(s->stdout_fd);
1602
1603         if (s->rate_limit)
1604                 journal_rate_limit_free(s->rate_limit);
1605 }
1606
1607 int main(int argc, char *argv[]) {
1608         Server server;
1609         int r;
1610
1611         /* if (getppid() != 1) { */
1612         /*         log_error("This program should be invoked by init only."); */
1613         /*         return EXIT_FAILURE; */
1614         /* } */
1615
1616         if (argc > 1) {
1617                 log_error("This program does not take arguments.");
1618                 return EXIT_FAILURE;
1619         }
1620
1621         log_set_target(LOG_TARGET_CONSOLE);
1622         log_set_max_level(LOG_DEBUG);
1623         log_parse_environment();
1624         log_open();
1625
1626         umask(0022);
1627
1628         r = server_init(&server);
1629         if (r < 0)
1630                 goto finish;
1631
1632         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1633
1634         sd_notify(false,
1635                   "READY=1\n"
1636                   "STATUS=Processing requests...");
1637
1638         for (;;) {
1639                 struct epoll_event event;
1640
1641                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1642                 if (r < 0) {
1643
1644                         if (errno == EINTR)
1645                                 continue;
1646
1647                         log_error("epoll_wait() failed: %m");
1648                         r = -errno;
1649                         goto finish;
1650                 } else if (r == 0)
1651                         break;
1652
1653                 r = process_event(&server, &event);
1654                 if (r < 0)
1655                         goto finish;
1656                 else if (r == 0)
1657                         break;
1658         }
1659
1660         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1661
1662 finish:
1663         sd_notify(false,
1664                   "STATUS=Shutting down...");
1665
1666         server_done(&server);
1667
1668         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1669 }