chiark / gitweb /
c216b7879023a5e6c0b43ba78c5593a77a154010
[elogind.git] / src / journal / journald.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU General Public License as published by
10   the Free Software Foundation; either version 2 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   General Public License for more details.
17
18   You should have received a copy of the GNU General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/epoll.h>
23 #include <sys/socket.h>
24 #include <errno.h>
25 #include <sys/signalfd.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <sys/acl.h>
29 #include <acl/libacl.h>
30 #include <stddef.h>
31 #include <sys/ioctl.h>
32 #include <linux/sockios.h>
33 #include <sys/statvfs.h>
34
35 #include "hashmap.h"
36 #include "journal-file.h"
37 #include "sd-daemon.h"
38 #include "socket-util.h"
39 #include "acl-util.h"
40 #include "cgroup-util.h"
41 #include "list.h"
42 #include "journal-rate-limit.h"
43
44 #define USER_JOURNALS_MAX 1024
45 #define STDOUT_STREAMS_MAX 4096
46
47 #define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC)
48
49 typedef struct StdoutStream StdoutStream;
50
51 typedef struct Server {
52         int epoll_fd;
53         int signal_fd;
54         int syslog_fd;
55         int native_fd;
56         int stdout_fd;
57
58         JournalFile *runtime_journal;
59         JournalFile *system_journal;
60         Hashmap *user_journals;
61
62         uint64_t seqnum;
63
64         char *buffer;
65         size_t buffer_size;
66
67         JournalRateLimit *rate_limit;
68
69         JournalMetrics metrics;
70         uint64_t max_use;
71         bool compress;
72
73         uint64_t cached_available_space;
74         usec_t cached_available_space_timestamp;
75
76         LIST_HEAD(StdoutStream, stdout_streams);
77         unsigned n_stdout_streams;
78 } Server;
79
80 typedef enum StdoutStreamState {
81         STDOUT_STREAM_TAG,
82         STDOUT_STREAM_PRIORITY,
83         STDOUT_STREAM_PRIORITY_PREFIX,
84         STDOUT_STREAM_TEE_CONSOLE,
85         STDOUT_STREAM_RUNNING
86 } StdoutStreamState;
87
88 struct StdoutStream {
89         Server *server;
90         StdoutStreamState state;
91
92         int fd;
93
94         struct ucred ucred;
95
96         char *tag;
97         int priority;
98         bool priority_prefix:1;
99         bool tee_console:1;
100
101         char buffer[LINE_MAX+1];
102         size_t length;
103
104         LIST_FIELDS(StdoutStream, stdout_stream);
105 };
106
107 static uint64_t available_space(Server *s) {
108         char ids[33];
109         sd_id128_t machine;
110         char *p;
111         const char *f;
112         struct statvfs ss;
113         uint64_t sum = 0, avail = 0, ss_avail = 0;
114         int r;
115         DIR *d;
116         usec_t ts = now(CLOCK_MONOTONIC);
117
118         if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts)
119                 return s->cached_available_space;
120
121         r = sd_id128_get_machine(&machine);
122         if (r < 0)
123                 return 0;
124
125         if (s->system_journal)
126                 f = "/var/log/journal/";
127         else
128                 f = "/run/log/journal/";
129
130         p = strappend(f, sd_id128_to_string(machine, ids));
131         if (!p)
132                 return 0;
133
134         d = opendir(p);
135         free(p);
136
137         if (!d)
138                 return 0;
139
140         if (fstatvfs(dirfd(d), &ss) < 0)
141                 goto finish;
142
143         for (;;) {
144                 struct stat st;
145                 struct dirent buf, *de;
146                 int k;
147
148                 k = readdir_r(d, &buf, &de);
149                 if (k != 0) {
150                         r = -k;
151                         goto finish;
152                 }
153
154                 if (!de)
155                         break;
156
157                 if (!dirent_is_file_with_suffix(de, ".journal"))
158                         continue;
159
160                 if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0)
161                         continue;
162
163                 sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize;
164         }
165
166         avail = sum >= s->max_use ? 0 : s->max_use - sum;
167
168         ss_avail = ss.f_bsize * ss.f_bavail;
169
170         ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free;
171
172         if (ss_avail < avail)
173                 avail = ss_avail;
174
175         s->cached_available_space = avail;
176         s->cached_available_space_timestamp = ts;
177
178 finish:
179         closedir(d);
180
181         return avail;
182 }
183
184 static void fix_perms(JournalFile *f, uid_t uid) {
185         acl_t acl;
186         acl_entry_t entry;
187         acl_permset_t permset;
188         int r;
189
190         assert(f);
191
192         r = fchmod_and_fchown(f->fd, 0640, 0, 0);
193         if (r < 0)
194                 log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r));
195
196         if (uid <= 0)
197                 return;
198
199         acl = acl_get_fd(f->fd);
200         if (!acl) {
201                 log_warning("Failed to read ACL on %s, ignoring: %m", f->path);
202                 return;
203         }
204
205         r = acl_find_uid(acl, uid, &entry);
206         if (r <= 0) {
207
208                 if (acl_create_entry(&acl, &entry) < 0 ||
209                     acl_set_tag_type(entry, ACL_USER) < 0 ||
210                     acl_set_qualifier(entry, &uid) < 0) {
211                         log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
212                         goto finish;
213                 }
214         }
215
216         if (acl_get_permset(entry, &permset) < 0 ||
217             acl_add_perm(permset, ACL_READ) < 0 ||
218             acl_calc_mask(&acl) < 0) {
219                 log_warning("Failed to patch ACL on %s, ignoring: %m", f->path);
220                 goto finish;
221         }
222
223         if (acl_set_fd(f->fd, acl) < 0)
224                 log_warning("Failed to set ACL on %s, ignoring: %m", f->path);
225
226 finish:
227         acl_free(acl);
228 }
229
230 static JournalFile* find_journal(Server *s, uid_t uid) {
231         char *p;
232         int r;
233         JournalFile *f;
234         char ids[33];
235         sd_id128_t machine;
236
237         assert(s);
238
239         /* We split up user logs only on /var, not on /run */
240         if (!s->system_journal)
241                 return s->runtime_journal;
242
243         if (uid <= 0)
244                 return s->system_journal;
245
246         r = sd_id128_get_machine(&machine);
247         if (r < 0)
248                 return s->system_journal;
249
250         f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid));
251         if (f)
252                 return f;
253
254         if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0)
255                 return s->system_journal;
256
257         while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) {
258                 /* Too many open? Then let's close one */
259                 f = hashmap_steal_first(s->user_journals);
260                 assert(f);
261                 journal_file_close(f);
262         }
263
264         r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f);
265         free(p);
266
267         if (r < 0)
268                 return s->system_journal;
269
270         fix_perms(f, uid);
271         f->metrics = s->metrics;
272         f->compress = s->compress;
273
274         r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f);
275         if (r < 0) {
276                 journal_file_close(f);
277                 return s->system_journal;
278         }
279
280         return f;
281 }
282
283 static void server_vacuum(Server *s) {
284         Iterator i;
285         void *k;
286         char *p;
287         char ids[33];
288         sd_id128_t machine;
289         int r;
290         JournalFile *f;
291
292         log_info("Rotating...");
293
294         if (s->runtime_journal) {
295                 r = journal_file_rotate(&s->runtime_journal);
296                 if (r < 0)
297                         log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r));
298         }
299
300         if (s->system_journal) {
301                 r = journal_file_rotate(&s->system_journal);
302                 if (r < 0)
303                         log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r));
304         }
305
306         HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) {
307                 r = journal_file_rotate(&f);
308                 if (r < 0)
309                         log_error("Failed to rotate %s: %s", f->path, strerror(-r));
310                 else
311                         hashmap_replace(s->user_journals, k, f);
312         }
313
314         log_info("Vacuuming...");
315
316         r = sd_id128_get_machine(&machine);
317         if (r < 0) {
318                 log_error("Failed to get machine ID: %s", strerror(-r));
319                 return;
320         }
321
322         if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) {
323                 log_error("Out of memory.");
324                 return;
325         }
326
327         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
328         if (r < 0 && r != -ENOENT)
329                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
330         free(p);
331
332         if (asprintf(&p, "/run/log/journal/%s", ids) < 0) {
333                 log_error("Out of memory.");
334                 return;
335         }
336
337         r = journal_directory_vacuum(p, s->max_use, s->metrics.keep_free);
338         if (r < 0 && r != -ENOENT)
339                 log_error("Failed to vacuum %s: %s", p, strerror(-r));
340         free(p);
341
342         s->cached_available_space_timestamp = 0;
343 }
344
345 static char *shortened_cgroup_path(pid_t pid) {
346         int r;
347         char *process_path, *init_path, *path;
348
349         assert(pid > 0);
350
351         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path);
352         if (r < 0)
353                 return NULL;
354
355         r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path);
356         if (r < 0) {
357                 free(process_path);
358                 return NULL;
359         }
360
361         if (streq(init_path, "/"))
362                 init_path[0] = 0;
363
364         if (startswith(process_path, init_path))
365                 path = process_path + strlen(init_path);
366         else
367                 path = process_path;
368
369         free(init_path);
370
371         return path;
372 }
373
374 static void dispatch_message_real(Server *s,
375                              struct iovec *iovec, unsigned n, unsigned m,
376                              struct ucred *ucred,
377                              struct timeval *tv) {
378
379         char *pid = NULL, *uid = NULL, *gid = NULL,
380                 *source_time = NULL, *boot_id = NULL, *machine_id = NULL,
381                 *comm = NULL, *cmdline = NULL, *hostname = NULL,
382                 *audit_session = NULL, *audit_loginuid = NULL,
383                 *exe = NULL, *cgroup = NULL;
384
385         char idbuf[33];
386         sd_id128_t id;
387         int r;
388         char *t;
389         uid_t loginuid = 0, realuid = 0;
390         JournalFile *f;
391         bool vacuumed = false;
392
393         assert(s);
394         assert(iovec);
395         assert(n > 0);
396         assert(n + 13 <= m);
397
398         if (ucred) {
399                 uint32_t session;
400                 char *path;
401
402                 realuid = ucred->uid;
403
404                 if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0)
405                         IOVEC_SET_STRING(iovec[n++], pid);
406
407                 if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0)
408                         IOVEC_SET_STRING(iovec[n++], uid);
409
410                 if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0)
411                         IOVEC_SET_STRING(iovec[n++], gid);
412
413                 r = get_process_comm(ucred->pid, &t);
414                 if (r >= 0) {
415                         comm = strappend("_COMM=", t);
416                         if (comm)
417                                 IOVEC_SET_STRING(iovec[n++], comm);
418                         free(t);
419                 }
420
421                 r = get_process_exe(ucred->pid, &t);
422                 if (r >= 0) {
423                         exe = strappend("_EXE=", t);
424                         if (comm)
425                                 IOVEC_SET_STRING(iovec[n++], exe);
426                         free(t);
427                 }
428
429                 r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t);
430                 if (r >= 0) {
431                         cmdline = strappend("_CMDLINE=", t);
432                         if (cmdline)
433                                 IOVEC_SET_STRING(iovec[n++], cmdline);
434                         free(t);
435                 }
436
437                 r = audit_session_from_pid(ucred->pid, &session);
438                 if (r >= 0)
439                         if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0)
440                                 IOVEC_SET_STRING(iovec[n++], audit_session);
441
442                 r = audit_loginuid_from_pid(ucred->pid, &loginuid);
443                 if (r >= 0)
444                         if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0)
445                                 IOVEC_SET_STRING(iovec[n++], audit_loginuid);
446
447                 path = shortened_cgroup_path(ucred->pid);
448                 if (path) {
449                         cgroup = strappend("_SYSTEMD_CGROUP=", path);
450                         if (cgroup)
451                                 IOVEC_SET_STRING(iovec[n++], cgroup);
452
453                         free(path);
454                 }
455         }
456
457         if (tv) {
458                 if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu",
459                              (unsigned long long) timeval_load(tv)) >= 0)
460                         IOVEC_SET_STRING(iovec[n++], source_time);
461         }
462
463         /* Note that strictly speaking storing the boot id here is
464          * redundant since the entry includes this in-line
465          * anyway. However, we need this indexed, too. */
466         r = sd_id128_get_boot(&id);
467         if (r >= 0)
468                 if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
469                         IOVEC_SET_STRING(iovec[n++], boot_id);
470
471         r = sd_id128_get_machine(&id);
472         if (r >= 0)
473                 if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0)
474                         IOVEC_SET_STRING(iovec[n++], machine_id);
475
476         t = gethostname_malloc();
477         if (t) {
478                 hostname = strappend("_HOSTNAME=", t);
479                 if (hostname)
480                         IOVEC_SET_STRING(iovec[n++], hostname);
481                 free(t);
482         }
483
484         assert(n <= m);
485
486 retry:
487         f = find_journal(s, realuid == 0 ? 0 : loginuid);
488         if (!f)
489                 log_warning("Dropping message, as we can't find a place to store the data.");
490         else {
491                 r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL);
492
493                 if (r == -E2BIG && !vacuumed) {
494                         log_info("Allocation limit reached.");
495
496                         server_vacuum(s);
497                         vacuumed = true;
498
499                         log_info("Retrying write.");
500                         goto retry;
501                 }
502
503                 if (r < 0)
504                         log_error("Failed to write entry, ignoring: %s", strerror(-r));
505         }
506
507         free(pid);
508         free(uid);
509         free(gid);
510         free(comm);
511         free(exe);
512         free(cmdline);
513         free(source_time);
514         free(boot_id);
515         free(machine_id);
516         free(hostname);
517         free(audit_session);
518         free(audit_loginuid);
519         free(cgroup);
520 }
521
522 static void dispatch_message(Server *s,
523                              struct iovec *iovec, unsigned n, unsigned m,
524                              struct ucred *ucred,
525                              struct timeval *tv,
526                              int priority) {
527         int rl;
528         char *path, *c;
529
530         assert(s);
531         assert(iovec || n == 0);
532
533         if (n == 0)
534                 return;
535
536         if (!ucred)
537                 goto finish;
538
539         path = shortened_cgroup_path(ucred->pid);
540         if (!path)
541                 goto finish;
542
543         /* example: /user/lennart/3/foobar
544          *          /system/dbus.service/foobar
545          *
546          * So let's cut of everything past the third /, since that is
547          * wher user directories start */
548
549         c = strchr(path, '/');
550         if (c) {
551                 c = strchr(c+1, '/');
552                 if (c) {
553                         c = strchr(c+1, '/');
554                         if (c)
555                                 *c = 0;
556                 }
557         }
558
559         rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s));
560
561         if (rl == 0) {
562                 free(path);
563                 return;
564         }
565
566         if (rl > 1) {
567                 int j = 0;
568                 char suppress_message[LINE_MAX];
569                 struct iovec suppress_iovec[15];
570
571                 /* Write a suppression message if we suppressed something */
572
573                 snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path);
574                 char_array_0(suppress_message);
575
576                 IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5");
577                 IOVEC_SET_STRING(suppress_iovec[j++], suppress_message);
578
579                 dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL);
580         }
581
582         free(path);
583
584 finish:
585         dispatch_message_real(s, iovec, n, m, ucred, tv);
586 }
587
588 static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) {
589         char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL;
590         struct iovec iovec[16];
591         unsigned n = 0;
592         int priority = LOG_USER | LOG_INFO;
593
594         assert(s);
595         assert(buf);
596
597         parse_syslog_priority((char**) &buf, &priority);
598         skip_syslog_date((char**) &buf);
599
600         if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0)
601                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
602
603         if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0)
604                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
605
606         message = strappend("MESSAGE=", buf);
607         if (message)
608                 IOVEC_SET_STRING(iovec[n++], message);
609
610         dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK);
611
612         free(message);
613         free(syslog_facility);
614         free(syslog_priority);
615 }
616
617 static bool valid_user_field(const char *p, size_t l) {
618         const char *a;
619
620         /* We kinda enforce POSIX syntax recommendations for
621            environment variables here, but make a couple of additional
622            requirements.
623
624            http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */
625
626         /* No empty field names */
627         if (l <= 0)
628                 return false;
629
630         /* Don't allow names longer than 64 chars */
631         if (l > 64)
632                 return false;
633
634         /* Variables starting with an underscore are protected */
635         if (p[0] == '_')
636                 return false;
637
638         /* Don't allow digits as first character */
639         if (p[0] >= '0' && p[0] <= '9')
640                 return false;
641
642         /* Only allow A-Z0-9 and '_' */
643         for (a = p; a < p + l; a++)
644                 if (!((*a >= 'A' && *a <= 'Z') ||
645                       (*a >= '0' && *a <= '9') ||
646                       *a == '_'))
647                         return false;
648
649         return true;
650 }
651
652 static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) {
653         struct iovec *iovec = NULL;
654         unsigned n = 0, m = 0, j;
655         const char *p;
656         size_t remaining;
657         int priority = LOG_INFO;
658
659         assert(s);
660         assert(buffer || n == 0);
661
662         p = buffer;
663         remaining = buffer_size;
664
665         while (remaining > 0) {
666                 const char *e, *q;
667
668                 e = memchr(p, '\n', remaining);
669
670                 if (!e) {
671                         /* Trailing noise, let's ignore it, and flush what we collected */
672                         log_debug("Received message with trailing noise, ignoring.");
673                         break;
674                 }
675
676                 if (e == p) {
677                         /* Entry separator */
678                         dispatch_message(s, iovec, n, m, ucred, tv, priority);
679                         n = 0;
680                         priority = LOG_INFO;
681
682                         p++;
683                         remaining--;
684                         continue;
685                 }
686
687                 if (*p == '.' || *p == '#') {
688                         /* Ignore control commands for now, and
689                          * comments too. */
690                         remaining -= (e - p) + 1;
691                         p = e + 1;
692                         continue;
693                 }
694
695                 /* A property follows */
696
697                 if (n+13 >= m) {
698                         struct iovec *c;
699                         unsigned u;
700
701                         u = MAX((n+13U) * 2U, 4U);
702                         c = realloc(iovec, u * sizeof(struct iovec));
703                         if (!c) {
704                                 log_error("Out of memory");
705                                 break;
706                         }
707
708                         iovec = c;
709                         m = u;
710                 }
711
712                 q = memchr(p, '=', e - p);
713                 if (q) {
714                         if (valid_user_field(p, q - p)) {
715                                 /* If the field name starts with an
716                                  * underscore, skip the variable,
717                                  * since that indidates a trusted
718                                  * field */
719                                 iovec[n].iov_base = (char*) p;
720                                 iovec[n].iov_len = e - p;
721                                 n++;
722
723                                 /* We need to determine the priority
724                                  * of this entry for the rate limiting
725                                  * logic */
726                                 if (e - p == 10 &&
727                                     memcmp(p, "PRIORITY=", 10) == 0 &&
728                                     p[10] >= '0' &&
729                                     p[10] <= '9')
730                                         priority = p[10] - '0';
731                         }
732
733                         remaining -= (e - p) + 1;
734                         p = e + 1;
735                         continue;
736                 } else {
737                         uint64_t l;
738                         char *k;
739
740                         if (remaining < e - p + 1 + sizeof(uint64_t) + 1) {
741                                 log_debug("Failed to parse message, ignoring.");
742                                 break;
743                         }
744
745                         memcpy(&l, e + 1, sizeof(uint64_t));
746                         l = le64toh(l);
747
748                         if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 ||
749                             e[1+sizeof(uint64_t)+l] != '\n') {
750                                 log_debug("Failed to parse message, ignoring.");
751                                 break;
752                         }
753
754                         k = malloc((e - p) + 1 + l);
755                         if (!k) {
756                                 log_error("Out of memory");
757                                 break;
758                         }
759
760                         memcpy(k, p, e - p);
761                         k[e - p] = '=';
762                         memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l);
763
764                         if (valid_user_field(p, e - p)) {
765                                 iovec[n].iov_base = k;
766                                 iovec[n].iov_len = (e - p) + 1 + l;
767                                 n++;
768                         } else
769                                 free(k);
770
771                         remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1;
772                         p = e + 1 + sizeof(uint64_t) + l + 1;
773                 }
774         }
775
776         dispatch_message(s, iovec, n, m, ucred, tv, priority);
777
778         for (j = 0; j < n; j++)
779                 if (iovec[j].iov_base < buffer ||
780                     (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size)
781                         free(iovec[j].iov_base);
782 }
783
784 static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) {
785         struct iovec iovec[15];
786         char *message = NULL, *syslog_priority = NULL;
787         unsigned n = 0;
788         size_t tag_len;
789         int priority;
790
791         assert(s);
792         assert(p);
793
794         priority = s->priority;
795
796         if (s->priority_prefix &&
797             l > 3 &&
798             p[0] == '<' &&
799             p[1] >= '0' && p[1] <= '7' &&
800             p[2] == '>') {
801
802                 priority = p[1] - '0';
803                 p += 3;
804                 l -= 3;
805         }
806
807         if (l <= 0)
808                 return 0;
809
810         if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0)
811                 IOVEC_SET_STRING(iovec[n++], syslog_priority);
812
813         tag_len = s->tag ? strlen(s->tag) + 2: 0;
814         message = malloc(8 + tag_len + l);
815         if (message) {
816                 memcpy(message, "MESSAGE=", 8);
817
818                 if (s->tag) {
819                         memcpy(message+8, s->tag, tag_len-2);
820                         memcpy(message+8+tag_len-2, ": ", 2);
821                 }
822
823                 memcpy(message+8+tag_len, p, l);
824                 iovec[n].iov_base = message;
825                 iovec[n].iov_len = 8+tag_len+l;
826                 n++;
827         }
828
829         dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority);
830
831         if (s->tee_console) {
832                 int console;
833
834                 console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC);
835                 if (console >= 0) {
836                         n = 0;
837                         if (s->tag) {
838                                 IOVEC_SET_STRING(iovec[n++], s->tag);
839                                 IOVEC_SET_STRING(iovec[n++], ": ");
840                         }
841
842                         iovec[n].iov_base = (void*) p;
843                         iovec[n].iov_len = l;
844                         n++;
845
846                         IOVEC_SET_STRING(iovec[n++], (char*) "\n");
847
848                         writev(console, iovec, n);
849                 }
850         }
851
852         free(message);
853         free(syslog_priority);
854
855         return 0;
856 }
857
858 static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) {
859         assert(s);
860         assert(p);
861
862         while (l > 0 && strchr(WHITESPACE, *p)) {
863                 l--;
864                 p++;
865         }
866
867         while (l > 0 && strchr(WHITESPACE, *(p+l-1)))
868                 l--;
869
870         switch (s->state) {
871
872         case STDOUT_STREAM_TAG:
873
874                 if (l > 0) {
875                         s->tag = strndup(p, l);
876                         if (!s->tag) {
877                                 log_error("Out of memory");
878                                 return -EINVAL;
879                         }
880                 }
881
882                 s->state = STDOUT_STREAM_PRIORITY;
883                 return 0;
884
885         case STDOUT_STREAM_PRIORITY:
886                 if (l != 1 || *p < '0' || *p > '7') {
887                         log_warning("Failed to parse log priority line.");
888                         return -EINVAL;
889                 }
890
891                 s->priority = *p - '0';
892                 s->state = STDOUT_STREAM_PRIORITY_PREFIX;
893                 return 0;
894
895         case STDOUT_STREAM_PRIORITY_PREFIX:
896                 if (l != 1 || *p < '0' || *p > '1') {
897                         log_warning("Failed to parse priority prefix line.");
898                         return -EINVAL;
899                 }
900
901                 s->priority_prefix = *p - '0';
902                 s->state = STDOUT_STREAM_TEE_CONSOLE;
903                 return 0;
904
905         case STDOUT_STREAM_TEE_CONSOLE:
906                 if (l != 1 || *p < '0' || *p > '1') {
907                         log_warning("Failed to parse tee to console line.");
908                         return -EINVAL;
909                 }
910
911                 s->tee_console = *p - '0';
912                 s->state = STDOUT_STREAM_RUNNING;
913                 return 0;
914
915         case STDOUT_STREAM_RUNNING:
916                 return stdout_stream_log(s, p, l);
917         }
918
919         assert_not_reached("Unknown stream state");
920 }
921
922 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
923         char *p;
924         size_t remaining;
925         int r;
926
927         assert(s);
928
929         p = s->buffer;
930         remaining = s->length;
931         for (;;) {
932                 char *end;
933                 size_t skip;
934
935                 end = memchr(p, '\n', remaining);
936                 if (!end) {
937                         if (remaining >= LINE_MAX) {
938                                 end = p + LINE_MAX;
939                                 skip = LINE_MAX;
940                         } else
941                                 break;
942                 } else
943                         skip = end - p + 1;
944
945                 r = stdout_stream_line(s, p, end - p);
946                 if (r < 0)
947                         return r;
948
949                 remaining -= skip;
950                 p += skip;
951         }
952
953         if (force_flush && remaining > 0) {
954                 r = stdout_stream_line(s, p, remaining);
955                 if (r < 0)
956                         return r;
957
958                 p += remaining;
959                 remaining = 0;
960         }
961
962         if (p > s->buffer) {
963                 memmove(s->buffer, p, remaining);
964                 s->length = remaining;
965         }
966
967         return 0;
968 }
969
970 static int stdout_stream_process(StdoutStream *s) {
971         ssize_t l;
972         int r;
973
974         assert(s);
975
976         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
977         if (l < 0) {
978
979                 if (errno == EAGAIN)
980                         return 0;
981
982                 log_warning("Failed to read from stream: %m");
983                 return -errno;
984         }
985
986         if (l == 0) {
987                 r = stdout_stream_scan(s, true);
988                 if (r < 0)
989                         return r;
990
991                 return 0;
992         }
993
994         s->length += l;
995         r = stdout_stream_scan(s, false);
996         if (r < 0)
997                 return r;
998
999         return 1;
1000
1001 }
1002
1003 static void stdout_stream_free(StdoutStream *s) {
1004         assert(s);
1005
1006         if (s->server) {
1007                 assert(s->server->n_stdout_streams > 0);
1008                 s->server->n_stdout_streams --;
1009                 LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s);
1010         }
1011
1012         if (s->fd >= 0) {
1013                 if (s->server)
1014                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
1015
1016                 close_nointr_nofail(s->fd);
1017         }
1018
1019         free(s->tag);
1020         free(s);
1021 }
1022
1023 static int stdout_stream_new(Server *s) {
1024         StdoutStream *stream;
1025         int fd, r;
1026         socklen_t len;
1027         struct epoll_event ev;
1028
1029         assert(s);
1030
1031         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
1032         if (fd < 0) {
1033                 if (errno == EAGAIN)
1034                         return 0;
1035
1036                 log_error("Failed to accept stdout connection: %m");
1037                 return -errno;
1038         }
1039
1040         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
1041                 log_warning("Too many stdout streams, refusing connection.");
1042                 close_nointr_nofail(fd);
1043                 return 0;
1044         }
1045
1046         stream = new0(StdoutStream, 1);
1047         if (!stream) {
1048                 log_error("Out of memory.");
1049                 close_nointr_nofail(fd);
1050                 return -ENOMEM;
1051         }
1052
1053         stream->fd = fd;
1054
1055         len = sizeof(stream->ucred);
1056         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
1057                 log_error("Failed to determine peer credentials: %m");
1058                 r = -errno;
1059                 goto fail;
1060         }
1061
1062         if (shutdown(fd, SHUT_WR) < 0) {
1063                 log_error("Failed to shutdown writing side of socket: %m");
1064                 r = -errno;
1065                 goto fail;
1066         }
1067
1068         zero(ev);
1069         ev.data.ptr = stream;
1070         ev.events = EPOLLIN;
1071         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
1072                 log_error("Failed to add stream to event loop: %m");
1073                 r = -errno;
1074                 goto fail;
1075         }
1076
1077         stream->server = s;
1078         LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream);
1079         s->n_stdout_streams ++;
1080
1081         return 0;
1082
1083 fail:
1084         stdout_stream_free(stream);
1085         return r;
1086 }
1087
1088 static int process_event(Server *s, struct epoll_event *ev) {
1089         assert(s);
1090
1091         if (ev->data.fd == s->signal_fd) {
1092                 struct signalfd_siginfo sfsi;
1093                 ssize_t n;
1094
1095                 if (ev->events != EPOLLIN) {
1096                         log_info("Got invalid event from epoll.");
1097                         return -EIO;
1098                 }
1099
1100                 n = read(s->signal_fd, &sfsi, sizeof(sfsi));
1101                 if (n != sizeof(sfsi)) {
1102
1103                         if (n >= 0)
1104                                 return -EIO;
1105
1106                         if (errno == EINTR || errno == EAGAIN)
1107                                 return 0;
1108
1109                         return -errno;
1110                 }
1111
1112                 log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));
1113                 return 0;
1114
1115         } else if (ev->data.fd == s->native_fd ||
1116                    ev->data.fd == s->syslog_fd) {
1117
1118                 if (ev->events != EPOLLIN) {
1119                         log_info("Got invalid event from epoll.");
1120                         return -EIO;
1121                 }
1122
1123                 for (;;) {
1124                         struct msghdr msghdr;
1125                         struct iovec iovec;
1126                         struct ucred *ucred = NULL;
1127                         struct timeval *tv = NULL;
1128                         struct cmsghdr *cmsg;
1129                         union {
1130                                 struct cmsghdr cmsghdr;
1131                                 uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) +
1132                                             CMSG_SPACE(sizeof(struct timeval))];
1133                         } control;
1134                         ssize_t n;
1135                         int v;
1136
1137                         if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) {
1138                                 log_error("SIOCINQ failed: %m");
1139                                 return -errno;
1140                         }
1141
1142                         if (v <= 0)
1143                                 return 1;
1144
1145                         if (s->buffer_size < (size_t) v) {
1146                                 void *b;
1147                                 size_t l;
1148
1149                                 l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2);
1150                                 b = realloc(s->buffer, l+1);
1151
1152                                 if (!b) {
1153                                         log_error("Couldn't increase buffer.");
1154                                         return -ENOMEM;
1155                                 }
1156
1157                                 s->buffer_size = l;
1158                                 s->buffer = b;
1159                         }
1160
1161                         zero(iovec);
1162                         iovec.iov_base = s->buffer;
1163                         iovec.iov_len = s->buffer_size;
1164
1165                         zero(control);
1166                         zero(msghdr);
1167                         msghdr.msg_iov = &iovec;
1168                         msghdr.msg_iovlen = 1;
1169                         msghdr.msg_control = &control;
1170                         msghdr.msg_controllen = sizeof(control);
1171
1172                         n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT);
1173                         if (n < 0) {
1174
1175                                 if (errno == EINTR || errno == EAGAIN)
1176                                         return 1;
1177
1178                                 log_error("recvmsg() failed: %m");
1179                                 return -errno;
1180                         }
1181
1182                         for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) {
1183
1184                                 if (cmsg->cmsg_level == SOL_SOCKET &&
1185                                     cmsg->cmsg_type == SCM_CREDENTIALS &&
1186                                     cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred)))
1187                                         ucred = (struct ucred*) CMSG_DATA(cmsg);
1188                                 else if (cmsg->cmsg_level == SOL_SOCKET &&
1189                                          cmsg->cmsg_type == SO_TIMESTAMP &&
1190                                          cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval)))
1191                                         tv = (struct timeval*) CMSG_DATA(cmsg);
1192                         }
1193
1194                         if (ev->data.fd == s->syslog_fd) {
1195                                 char *e;
1196
1197                                 e = memchr(s->buffer, '\n', n);
1198                                 if (e)
1199                                         *e = 0;
1200                                 else
1201                                         s->buffer[n] = 0;
1202
1203                                 process_syslog_message(s, strstrip(s->buffer), ucred, tv);
1204                         } else
1205                                 process_native_message(s, s->buffer, n, ucred, tv);
1206                 }
1207
1208                 return 1;
1209
1210         } else if (ev->data.fd == s->stdout_fd) {
1211
1212                 if (ev->events != EPOLLIN) {
1213                         log_info("Got invalid event from epoll.");
1214                         return -EIO;
1215                 }
1216
1217                 stdout_stream_new(s);
1218                 return 1;
1219
1220         } else {
1221                 StdoutStream *stream;
1222
1223                 if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
1224                         log_info("Got invalid event from epoll.");
1225                         return -EIO;
1226                 }
1227
1228                 /* If it is none of the well-known fds, it must be an
1229                  * stdout stream fd. Note that this is a bit ugly here
1230                  * (since we rely that none of the well-known fds
1231                  * could be interpreted as pointer), but nonetheless
1232                  * safe, since the well-known fds would never get an
1233                  * fd > 4096, i.e. beyond the first memory page */
1234
1235                 stream = ev->data.ptr;
1236
1237                 if (stdout_stream_process(stream) <= 0)
1238                         stdout_stream_free(stream);
1239
1240                 return 1;
1241         }
1242
1243         log_error("Unknown event.");
1244         return 0;
1245 }
1246
1247 static int system_journal_open(Server *s) {
1248         int r;
1249         char *fn;
1250         sd_id128_t machine;
1251         char ids[33];
1252
1253         r = sd_id128_get_machine(&machine);
1254         if (r < 0)
1255                 return r;
1256
1257         /* First try to create the machine path, but not the prefix */
1258         fn = strappend("/var/log/journal/", sd_id128_to_string(machine, ids));
1259         if (!fn)
1260                 return -ENOMEM;
1261         (void) mkdir(fn, 0755);
1262         free(fn);
1263
1264         /* The create the system journal file */
1265         fn = join("/var/log/journal/", ids, "/system.journal", NULL);
1266         if (!fn)
1267                 return -ENOMEM;
1268
1269         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal);
1270         free(fn);
1271
1272         if (r >= 0) {
1273                 s->system_journal->metrics = s->metrics;
1274                 s->system_journal->compress = s->compress;
1275
1276                 fix_perms(s->system_journal, 0);
1277                 return r;
1278         }
1279
1280         if (r < 0 && r != -ENOENT) {
1281                 log_error("Failed to open system journal: %s", strerror(-r));
1282                 return r;
1283         }
1284
1285         /* /var didn't work, so try /run, but this time we
1286          * create the prefix too */
1287         fn = join("/run/log/journal/", ids, "/system.journal", NULL);
1288         if (!fn)
1289                 return -ENOMEM;
1290
1291         (void) mkdir_parents(fn, 0755);
1292         r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal);
1293         free(fn);
1294
1295         if (r < 0) {
1296                 log_error("Failed to open runtime journal: %s", strerror(-r));
1297                 return r;
1298         }
1299
1300         s->runtime_journal->metrics = s->metrics;
1301         s->runtime_journal->compress = s->compress;
1302
1303         fix_perms(s->runtime_journal, 0);
1304         return r;
1305 }
1306
1307 static int open_syslog_socket(Server *s) {
1308         union sockaddr_union sa;
1309         int one, r;
1310         struct epoll_event ev;
1311
1312         assert(s);
1313
1314         if (s->syslog_fd < 0) {
1315
1316                 s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1317                 if (s->syslog_fd < 0) {
1318                         log_error("socket() failed: %m");
1319                         return -errno;
1320                 }
1321
1322                 zero(sa);
1323                 sa.un.sun_family = AF_UNIX;
1324                 strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path));
1325
1326                 unlink(sa.un.sun_path);
1327
1328                 r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1329                 if (r < 0) {
1330                         log_error("bind() failed: %m");
1331                         return -errno;
1332                 }
1333
1334                 chmod(sa.un.sun_path, 0666);
1335         }
1336
1337         one = 1;
1338         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1339         if (r < 0) {
1340                 log_error("SO_PASSCRED failed: %m");
1341                 return -errno;
1342         }
1343
1344         one = 1;
1345         r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1346         if (r < 0) {
1347                 log_error("SO_TIMESTAMP failed: %m");
1348                 return -errno;
1349         }
1350
1351         zero(ev);
1352         ev.events = EPOLLIN;
1353         ev.data.fd = s->syslog_fd;
1354         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) {
1355                 log_error("Failed to add syslog server fd to epoll object: %m");
1356                 return -errno;
1357         }
1358
1359         return 0;
1360 }
1361
1362 static int open_native_socket(Server*s) {
1363         union sockaddr_union sa;
1364         int one, r;
1365         struct epoll_event ev;
1366
1367         assert(s);
1368
1369         if (s->native_fd < 0) {
1370
1371                 s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0);
1372                 if (s->native_fd < 0) {
1373                         log_error("socket() failed: %m");
1374                         return -errno;
1375                 }
1376
1377                 zero(sa);
1378                 sa.un.sun_family = AF_UNIX;
1379                 strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path));
1380
1381                 unlink(sa.un.sun_path);
1382
1383                 r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1384                 if (r < 0) {
1385                         log_error("bind() failed: %m");
1386                         return -errno;
1387                 }
1388
1389                 chmod(sa.un.sun_path, 0666);
1390         }
1391
1392         one = 1;
1393         r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one));
1394         if (r < 0) {
1395                 log_error("SO_PASSCRED failed: %m");
1396                 return -errno;
1397         }
1398
1399         one = 1;
1400         r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one));
1401         if (r < 0) {
1402                 log_error("SO_TIMESTAMP failed: %m");
1403                 return -errno;
1404         }
1405
1406         zero(ev);
1407         ev.events = EPOLLIN;
1408         ev.data.fd = s->native_fd;
1409         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) {
1410                 log_error("Failed to add native server fd to epoll object: %m");
1411                 return -errno;
1412         }
1413
1414         return 0;
1415 }
1416
1417 static int open_stdout_socket(Server *s) {
1418         union sockaddr_union sa;
1419         int r;
1420         struct epoll_event ev;
1421
1422         assert(s);
1423
1424         if (s->stdout_fd < 0) {
1425
1426                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0);
1427                 if (s->stdout_fd < 0) {
1428                         log_error("socket() failed: %m");
1429                         return -errno;
1430                 }
1431
1432                 zero(sa);
1433                 sa.un.sun_family = AF_UNIX;
1434                 strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path));
1435
1436                 unlink(sa.un.sun_path);
1437
1438                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
1439                 if (r < 0) {
1440                         log_error("bind() failed: %m");
1441                         return -errno;
1442                 }
1443
1444                 chmod(sa.un.sun_path, 0666);
1445
1446                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
1447                         log_error("liste() failed: %m");
1448                         return -errno;
1449                 }
1450         }
1451
1452         zero(ev);
1453         ev.events = EPOLLIN;
1454         ev.data.fd = s->stdout_fd;
1455         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
1456                 log_error("Failed to add stdout server fd to epoll object: %m");
1457                 return -errno;
1458         }
1459
1460         return 0;
1461 }
1462
1463 static int open_signalfd(Server *s) {
1464         sigset_t mask;
1465         struct epoll_event ev;
1466
1467         assert(s);
1468
1469         assert_se(sigemptyset(&mask) == 0);
1470         sigset_add_many(&mask, SIGINT, SIGTERM, -1);
1471         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
1472
1473         s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC);
1474         if (s->signal_fd < 0) {
1475                 log_error("signalfd(): %m");
1476                 return -errno;
1477         }
1478
1479         zero(ev);
1480         ev.events = EPOLLIN;
1481         ev.data.fd = s->signal_fd;
1482
1483         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) {
1484                 log_error("epoll_ctl(): %m");
1485                 return -errno;
1486         }
1487
1488         return 0;
1489 }
1490
1491 static int server_init(Server *s) {
1492         int n, r, fd;
1493
1494         assert(s);
1495
1496         zero(*s);
1497         s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1;
1498         s->metrics.max_size = DEFAULT_MAX_SIZE;
1499         s->metrics.min_size = DEFAULT_MIN_SIZE;
1500         s->metrics.keep_free = DEFAULT_KEEP_FREE;
1501         s->max_use = DEFAULT_MAX_USE;
1502         s->compress = true;
1503
1504         s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func);
1505         if (!s->user_journals) {
1506                 log_error("Out of memory.");
1507                 return -ENOMEM;
1508         }
1509
1510         s->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
1511         if (s->epoll_fd < 0) {
1512                 log_error("Failed to create epoll object: %m");
1513                 return -errno;
1514         }
1515
1516         n = sd_listen_fds(true);
1517         if (n < 0) {
1518                 log_error("Failed to read listening file descriptors from environment: %s", strerror(-n));
1519                 return n;
1520         }
1521
1522         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
1523
1524                 if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) {
1525
1526                         if (s->native_fd >= 0) {
1527                                 log_error("Too many native sockets passed.");
1528                                 return -EINVAL;
1529                         }
1530
1531                         s->native_fd = fd;
1532
1533                 } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) {
1534
1535                         if (s->stdout_fd >= 0) {
1536                                 log_error("Too many stdout sockets passed.");
1537                                 return -EINVAL;
1538                         }
1539
1540                         s->stdout_fd = fd;
1541
1542                 } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) {
1543
1544                         if (s->syslog_fd >= 0) {
1545                                 log_error("Too many /dev/log sockets passed.");
1546                                 return -EINVAL;
1547                         }
1548
1549                         s->syslog_fd = fd;
1550
1551                 } else {
1552                         log_error("Unknown socket passed.");
1553                         return -EINVAL;
1554                 }
1555         }
1556
1557         r = open_syslog_socket(s);
1558         if (r < 0)
1559                 return r;
1560
1561         r = open_native_socket(s);
1562         if (r < 0)
1563                 return r;
1564
1565         r = open_stdout_socket(s);
1566         if (r < 0)
1567                 return r;
1568
1569         r = system_journal_open(s);
1570         if (r < 0)
1571                 return r;
1572
1573         r = open_signalfd(s);
1574         if (r < 0)
1575                 return r;
1576
1577         s->rate_limit = journal_rate_limit_new(10*USEC_PER_SEC, 2);
1578         if (!s->rate_limit)
1579                 return -ENOMEM;
1580
1581         return 0;
1582 }
1583
1584 static void server_done(Server *s) {
1585         JournalFile *f;
1586         assert(s);
1587
1588         while (s->stdout_streams)
1589                 stdout_stream_free(s->stdout_streams);
1590
1591         if (s->system_journal)
1592                 journal_file_close(s->system_journal);
1593
1594         if (s->runtime_journal)
1595                 journal_file_close(s->runtime_journal);
1596
1597         while ((f = hashmap_steal_first(s->user_journals)))
1598                 journal_file_close(f);
1599
1600         hashmap_free(s->user_journals);
1601
1602         if (s->epoll_fd >= 0)
1603                 close_nointr_nofail(s->epoll_fd);
1604
1605         if (s->signal_fd >= 0)
1606                 close_nointr_nofail(s->signal_fd);
1607
1608         if (s->syslog_fd >= 0)
1609                 close_nointr_nofail(s->syslog_fd);
1610
1611         if (s->native_fd >= 0)
1612                 close_nointr_nofail(s->native_fd);
1613
1614         if (s->stdout_fd >= 0)
1615                 close_nointr_nofail(s->stdout_fd);
1616
1617         if (s->rate_limit)
1618                 journal_rate_limit_free(s->rate_limit);
1619 }
1620
1621 int main(int argc, char *argv[]) {
1622         Server server;
1623         int r;
1624
1625         /* if (getppid() != 1) { */
1626         /*         log_error("This program should be invoked by init only."); */
1627         /*         return EXIT_FAILURE; */
1628         /* } */
1629
1630         if (argc > 1) {
1631                 log_error("This program does not take arguments.");
1632                 return EXIT_FAILURE;
1633         }
1634
1635         log_set_target(LOG_TARGET_CONSOLE);
1636         log_set_max_level(LOG_DEBUG);
1637         log_parse_environment();
1638         log_open();
1639
1640         umask(0022);
1641
1642         r = server_init(&server);
1643         if (r < 0)
1644                 goto finish;
1645
1646         log_debug("systemd-journald running as pid %lu", (unsigned long) getpid());
1647
1648         sd_notify(false,
1649                   "READY=1\n"
1650                   "STATUS=Processing requests...");
1651
1652         for (;;) {
1653                 struct epoll_event event;
1654
1655                 r = epoll_wait(server.epoll_fd, &event, 1, -1);
1656                 if (r < 0) {
1657
1658                         if (errno == EINTR)
1659                                 continue;
1660
1661                         log_error("epoll_wait() failed: %m");
1662                         r = -errno;
1663                         goto finish;
1664                 } else if (r == 0)
1665                         break;
1666
1667                 r = process_event(&server, &event);
1668                 if (r < 0)
1669                         goto finish;
1670                 else if (r == 0)
1671                         break;
1672         }
1673
1674         log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid());
1675
1676 finish:
1677         sd_notify(false,
1678                   "STATUS=Shutting down...");
1679
1680         server_done(&server);
1681
1682         return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS;
1683 }