chiark / gitweb /
journald: port to sd-event and enable watchdog support
[elogind.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <stddef.h>
25
26 #ifdef HAVE_SELINUX
27 #include <selinux/selinux.h>
28 #endif
29
30 #include "sd-event.h"
31 #include "socket-util.h"
32 #include "selinux-util.h"
33 #include "journald-server.h"
34 #include "journald-stream.h"
35 #include "journald-syslog.h"
36 #include "journald-kmsg.h"
37 #include "journald-console.h"
38
39 #define STDOUT_STREAMS_MAX 4096
40
41 typedef enum StdoutStreamState {
42         STDOUT_STREAM_IDENTIFIER,
43         STDOUT_STREAM_UNIT_ID,
44         STDOUT_STREAM_PRIORITY,
45         STDOUT_STREAM_LEVEL_PREFIX,
46         STDOUT_STREAM_FORWARD_TO_SYSLOG,
47         STDOUT_STREAM_FORWARD_TO_KMSG,
48         STDOUT_STREAM_FORWARD_TO_CONSOLE,
49         STDOUT_STREAM_RUNNING
50 } StdoutStreamState;
51
52 struct StdoutStream {
53         Server *server;
54         StdoutStreamState state;
55
56         int fd;
57
58         struct ucred ucred;
59 #ifdef HAVE_SELINUX
60         security_context_t security_context;
61 #endif
62
63         char *identifier;
64         char *unit_id;
65         int priority;
66         bool level_prefix:1;
67         bool forward_to_syslog:1;
68         bool forward_to_kmsg:1;
69         bool forward_to_console:1;
70
71         char buffer[LINE_MAX+1];
72         size_t length;
73
74         sd_event_source *event_source;
75
76         LIST_FIELDS(StdoutStream, stdout_stream);
77 };
78
79 static int stdout_stream_log(StdoutStream *s, const char *p) {
80         struct iovec iovec[N_IOVEC_META_FIELDS + 5];
81         int priority;
82         char syslog_priority[] = "PRIORITY=\0";
83         char syslog_facility[sizeof("SYSLOG_FACILITY=") + DECIMAL_STR_MAX(priority)];
84         _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
85         unsigned n = 0;
86         char *label = NULL;
87         size_t label_len = 0;
88
89         assert(s);
90         assert(p);
91
92         if (isempty(p))
93                 return 0;
94
95         priority = s->priority;
96
97         if (s->level_prefix)
98                 syslog_parse_priority(&p, &priority, false);
99
100         if (s->forward_to_syslog || s->server->forward_to_syslog)
101                 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
102
103         if (s->forward_to_kmsg || s->server->forward_to_kmsg)
104                 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
105
106         if (s->forward_to_console || s->server->forward_to_console)
107                 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
108
109         IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
110
111         syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
112         IOVEC_SET_STRING(iovec[n++], syslog_priority);
113
114         if (priority & LOG_FACMASK) {
115                 snprintf(syslog_facility, sizeof(syslog_facility), "SYSLOG_FACILITY=%i", LOG_FAC(priority));
116                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
117         }
118
119         if (s->identifier) {
120                 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
121                 if (syslog_identifier)
122                         IOVEC_SET_STRING(iovec[n++], syslog_identifier);
123         }
124
125         message = strappend("MESSAGE=", p);
126         if (message)
127                 IOVEC_SET_STRING(iovec[n++], message);
128
129 #ifdef HAVE_SELINUX
130         if (s->security_context) {
131                 label = (char*) s->security_context;
132                 label_len = strlen((char*) s->security_context);
133         }
134 #endif
135
136         server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, label, label_len, s->unit_id, priority, 0);
137         return 0;
138 }
139
140 static int stdout_stream_line(StdoutStream *s, char *p) {
141         int r;
142
143         assert(s);
144         assert(p);
145
146         p = strstrip(p);
147
148         switch (s->state) {
149
150         case STDOUT_STREAM_IDENTIFIER:
151                 if (isempty(p))
152                         s->identifier = NULL;
153                 else  {
154                         s->identifier = strdup(p);
155                         if (!s->identifier)
156                                 return log_oom();
157                 }
158
159                 s->state = STDOUT_STREAM_UNIT_ID;
160                 return 0;
161
162         case STDOUT_STREAM_UNIT_ID:
163                 if (s->ucred.uid == 0) {
164                         if (isempty(p))
165                                 s->unit_id = NULL;
166                         else  {
167                                 s->unit_id = strdup(p);
168                                 if (!s->unit_id)
169                                         return log_oom();
170                         }
171                 }
172
173                 s->state = STDOUT_STREAM_PRIORITY;
174                 return 0;
175
176         case STDOUT_STREAM_PRIORITY:
177                 r = safe_atoi(p, &s->priority);
178                 if (r < 0 || s->priority < 0 || s->priority > 999) {
179                         log_warning("Failed to parse log priority line.");
180                         return -EINVAL;
181                 }
182
183                 s->state = STDOUT_STREAM_LEVEL_PREFIX;
184                 return 0;
185
186         case STDOUT_STREAM_LEVEL_PREFIX:
187                 r = parse_boolean(p);
188                 if (r < 0) {
189                         log_warning("Failed to parse level prefix line.");
190                         return -EINVAL;
191                 }
192
193                 s->level_prefix = !!r;
194                 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
195                 return 0;
196
197         case STDOUT_STREAM_FORWARD_TO_SYSLOG:
198                 r = parse_boolean(p);
199                 if (r < 0) {
200                         log_warning("Failed to parse forward to syslog line.");
201                         return -EINVAL;
202                 }
203
204                 s->forward_to_syslog = !!r;
205                 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
206                 return 0;
207
208         case STDOUT_STREAM_FORWARD_TO_KMSG:
209                 r = parse_boolean(p);
210                 if (r < 0) {
211                         log_warning("Failed to parse copy to kmsg line.");
212                         return -EINVAL;
213                 }
214
215                 s->forward_to_kmsg = !!r;
216                 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
217                 return 0;
218
219         case STDOUT_STREAM_FORWARD_TO_CONSOLE:
220                 r = parse_boolean(p);
221                 if (r < 0) {
222                         log_warning("Failed to parse copy to console line.");
223                         return -EINVAL;
224                 }
225
226                 s->forward_to_console = !!r;
227                 s->state = STDOUT_STREAM_RUNNING;
228                 return 0;
229
230         case STDOUT_STREAM_RUNNING:
231                 return stdout_stream_log(s, p);
232         }
233
234         assert_not_reached("Unknown stream state");
235 }
236
237 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
238         char *p;
239         size_t remaining;
240         int r;
241
242         assert(s);
243
244         p = s->buffer;
245         remaining = s->length;
246         for (;;) {
247                 char *end;
248                 size_t skip;
249
250                 end = memchr(p, '\n', remaining);
251                 if (end)
252                         skip = end - p + 1;
253                 else if (remaining >= sizeof(s->buffer) - 1) {
254                         end = p + sizeof(s->buffer) - 1;
255                         skip = remaining;
256                 } else
257                         break;
258
259                 *end = 0;
260
261                 r = stdout_stream_line(s, p);
262                 if (r < 0)
263                         return r;
264
265                 remaining -= skip;
266                 p += skip;
267         }
268
269         if (force_flush && remaining > 0) {
270                 p[remaining] = 0;
271                 r = stdout_stream_line(s, p);
272                 if (r < 0)
273                         return r;
274
275                 p += remaining;
276                 remaining = 0;
277         }
278
279         if (p > s->buffer) {
280                 memmove(s->buffer, p, remaining);
281                 s->length = remaining;
282         }
283
284         return 0;
285 }
286
287 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
288         StdoutStream *s = userdata;
289         ssize_t l;
290         int r;
291
292         assert(s);
293
294         if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
295                 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
296                 return -EIO;
297         }
298
299         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
300         if (l < 0) {
301
302                 if (errno == EAGAIN)
303                         return 0;
304
305                 log_warning("Failed to read from stream: %m");
306                 goto fail;
307         }
308
309         if (l == 0) {
310                 r = stdout_stream_scan(s, true);
311                 if (r < 0)
312                         goto fail;
313
314                 return 0;
315         }
316
317         s->length += l;
318         r = stdout_stream_scan(s, false);
319         if (r < 0)
320                 goto fail;
321
322         return 1;
323
324 fail:
325         stdout_stream_free(s);
326         return 0;
327 }
328
329 void stdout_stream_free(StdoutStream *s) {
330         assert(s);
331
332         if (s->server) {
333                 assert(s->server->n_stdout_streams > 0);
334                 s->server->n_stdout_streams --;
335                 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
336         }
337
338         if (s->event_source)
339                 s->event_source = sd_event_source_unref(s->event_source);
340
341         if (s->fd >= 0)
342                 close_nointr_nofail(s->fd);
343
344 #ifdef HAVE_SELINUX
345         if (s->security_context)
346                 freecon(s->security_context);
347 #endif
348
349         free(s->identifier);
350         free(s->unit_id);
351         free(s);
352 }
353
354 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
355         Server *s = userdata;
356         StdoutStream *stream;
357         int fd, r;
358         socklen_t len;
359
360         assert(s);
361
362         if (revents != EPOLLIN) {
363                 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
364                 return -EIO;
365         }
366
367         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
368         if (fd < 0) {
369                 if (errno == EAGAIN)
370                         return 0;
371
372                 log_error("Failed to accept stdout connection: %m");
373                 return -errno;
374         }
375
376         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
377                 log_warning("Too many stdout streams, refusing connection.");
378                 close_nointr_nofail(fd);
379                 return 0;
380         }
381
382         stream = new0(StdoutStream, 1);
383         if (!stream) {
384                 close_nointr_nofail(fd);
385                 return log_oom();
386         }
387
388         stream->fd = fd;
389
390         len = sizeof(stream->ucred);
391         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
392                 log_error("Failed to determine peer credentials: %m");
393                 r = -errno;
394                 goto fail;
395         }
396
397 #ifdef HAVE_SELINUX
398         if (use_selinux()) {
399                 if (getpeercon(fd, &stream->security_context) < 0 && errno != ENOPROTOOPT)
400                         log_error("Failed to determine peer security context: %m");
401         }
402 #endif
403
404         if (shutdown(fd, SHUT_WR) < 0) {
405                 log_error("Failed to shutdown writing side of socket: %m");
406                 r = -errno;
407                 goto fail;
408         }
409
410         r = sd_event_add_io(s->event, fd, EPOLLIN, stdout_stream_process, stream, &stream->event_source);
411         if (r < 0) {
412                 log_error("Failed to add stream to event loop: %s", strerror(-r));
413                 goto fail;
414         }
415
416         r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
417         if (r < 0) {
418                 log_error("Failed to adjust stdout event source priority: %s", strerror(-r));
419                 goto fail;
420         }
421
422         stream->server = s;
423         LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
424         s->n_stdout_streams ++;
425
426         return 0;
427
428 fail:
429         stdout_stream_free(stream);
430         return r;
431 }
432
433 int server_open_stdout_socket(Server *s) {
434         int r;
435
436         assert(s);
437
438         if (s->stdout_fd < 0) {
439                 union sockaddr_union sa = {
440                         .un.sun_family = AF_UNIX,
441                         .un.sun_path = "/run/systemd/journal/stdout",
442                 };
443
444                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
445                 if (s->stdout_fd < 0) {
446                         log_error("socket() failed: %m");
447                         return -errno;
448                 }
449
450                 unlink(sa.un.sun_path);
451
452                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
453                 if (r < 0) {
454                         log_error("bind() failed: %m");
455                         return -errno;
456                 }
457
458                 chmod(sa.un.sun_path, 0666);
459
460                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
461                         log_error("listen() failed: %m");
462                         return -errno;
463                 }
464         } else
465                 fd_nonblock(s->stdout_fd, 1);
466
467         r = sd_event_add_io(s->event, s->stdout_fd, EPOLLIN, stdout_stream_new, s, &s->stdout_event_source);
468         if (r < 0) {
469                 log_error("Failed to add stdout server fd to event source: %s", strerror(-r));
470                 return r;
471         }
472
473         r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
474         if (r < 0) {
475                 log_error("Failed to adjust priority of stdout server event source: %s", strerror(-r));
476                 return r;
477         }
478
479         return 0;
480 }