chiark / gitweb /
networkd: use new rtnl_message_read() API
[elogind.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <stddef.h>
25
26 #ifdef HAVE_SELINUX
27 #include <selinux/selinux.h>
28 #endif
29
30 #include "sd-event.h"
31 #include "socket-util.h"
32 #include "selinux-util.h"
33 #include "journald-server.h"
34 #include "journald-stream.h"
35 #include "journald-syslog.h"
36 #include "journald-kmsg.h"
37 #include "journald-console.h"
38
39 #define STDOUT_STREAMS_MAX 4096
40
41 typedef enum StdoutStreamState {
42         STDOUT_STREAM_IDENTIFIER,
43         STDOUT_STREAM_UNIT_ID,
44         STDOUT_STREAM_PRIORITY,
45         STDOUT_STREAM_LEVEL_PREFIX,
46         STDOUT_STREAM_FORWARD_TO_SYSLOG,
47         STDOUT_STREAM_FORWARD_TO_KMSG,
48         STDOUT_STREAM_FORWARD_TO_CONSOLE,
49         STDOUT_STREAM_RUNNING
50 } StdoutStreamState;
51
52 struct StdoutStream {
53         Server *server;
54         StdoutStreamState state;
55
56         int fd;
57
58         struct ucred ucred;
59 #ifdef HAVE_SELINUX
60         security_context_t security_context;
61 #endif
62
63         char *identifier;
64         char *unit_id;
65         int priority;
66         bool level_prefix:1;
67         bool forward_to_syslog:1;
68         bool forward_to_kmsg:1;
69         bool forward_to_console:1;
70
71         char buffer[LINE_MAX+1];
72         size_t length;
73
74         sd_event_source *event_source;
75
76         LIST_FIELDS(StdoutStream, stdout_stream);
77 };
78
79 static int stdout_stream_log(StdoutStream *s, const char *p) {
80         struct iovec iovec[N_IOVEC_META_FIELDS + 5];
81         int priority;
82         char syslog_priority[] = "PRIORITY=\0";
83         char syslog_facility[sizeof("SYSLOG_FACILITY=") + DECIMAL_STR_MAX(priority)];
84         _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
85         unsigned n = 0;
86         char *label = NULL;
87         size_t label_len = 0;
88
89         assert(s);
90         assert(p);
91
92         if (isempty(p))
93                 return 0;
94
95         priority = s->priority;
96
97         if (s->level_prefix)
98                 syslog_parse_priority(&p, &priority, false);
99
100         if (s->forward_to_syslog || s->server->forward_to_syslog)
101                 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
102
103         if (s->forward_to_kmsg || s->server->forward_to_kmsg)
104                 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
105
106         if (s->forward_to_console || s->server->forward_to_console)
107                 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
108
109         IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
110
111         syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
112         IOVEC_SET_STRING(iovec[n++], syslog_priority);
113
114         if (priority & LOG_FACMASK) {
115                 snprintf(syslog_facility, sizeof(syslog_facility), "SYSLOG_FACILITY=%i", LOG_FAC(priority));
116                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
117         }
118
119         if (s->identifier) {
120                 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
121                 if (syslog_identifier)
122                         IOVEC_SET_STRING(iovec[n++], syslog_identifier);
123         }
124
125         message = strappend("MESSAGE=", p);
126         if (message)
127                 IOVEC_SET_STRING(iovec[n++], message);
128
129 #ifdef HAVE_SELINUX
130         if (s->security_context) {
131                 label = (char*) s->security_context;
132                 label_len = strlen((char*) s->security_context);
133         }
134 #endif
135
136         server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, label, label_len, s->unit_id, priority, 0);
137         return 0;
138 }
139
140 static int stdout_stream_line(StdoutStream *s, char *p) {
141         int r;
142
143         assert(s);
144         assert(p);
145
146         p = strstrip(p);
147
148         switch (s->state) {
149
150         case STDOUT_STREAM_IDENTIFIER:
151                 if (isempty(p))
152                         s->identifier = NULL;
153                 else  {
154                         s->identifier = strdup(p);
155                         if (!s->identifier)
156                                 return log_oom();
157                 }
158
159                 s->state = STDOUT_STREAM_UNIT_ID;
160                 return 0;
161
162         case STDOUT_STREAM_UNIT_ID:
163                 if (s->ucred.uid == 0) {
164                         if (isempty(p))
165                                 s->unit_id = NULL;
166                         else  {
167                                 s->unit_id = strdup(p);
168                                 if (!s->unit_id)
169                                         return log_oom();
170                         }
171                 }
172
173                 s->state = STDOUT_STREAM_PRIORITY;
174                 return 0;
175
176         case STDOUT_STREAM_PRIORITY:
177                 r = safe_atoi(p, &s->priority);
178                 if (r < 0 || s->priority < 0 || s->priority > 999) {
179                         log_warning("Failed to parse log priority line.");
180                         return -EINVAL;
181                 }
182
183                 s->state = STDOUT_STREAM_LEVEL_PREFIX;
184                 return 0;
185
186         case STDOUT_STREAM_LEVEL_PREFIX:
187                 r = parse_boolean(p);
188                 if (r < 0) {
189                         log_warning("Failed to parse level prefix line.");
190                         return -EINVAL;
191                 }
192
193                 s->level_prefix = !!r;
194                 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
195                 return 0;
196
197         case STDOUT_STREAM_FORWARD_TO_SYSLOG:
198                 r = parse_boolean(p);
199                 if (r < 0) {
200                         log_warning("Failed to parse forward to syslog line.");
201                         return -EINVAL;
202                 }
203
204                 s->forward_to_syslog = !!r;
205                 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
206                 return 0;
207
208         case STDOUT_STREAM_FORWARD_TO_KMSG:
209                 r = parse_boolean(p);
210                 if (r < 0) {
211                         log_warning("Failed to parse copy to kmsg line.");
212                         return -EINVAL;
213                 }
214
215                 s->forward_to_kmsg = !!r;
216                 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
217                 return 0;
218
219         case STDOUT_STREAM_FORWARD_TO_CONSOLE:
220                 r = parse_boolean(p);
221                 if (r < 0) {
222                         log_warning("Failed to parse copy to console line.");
223                         return -EINVAL;
224                 }
225
226                 s->forward_to_console = !!r;
227                 s->state = STDOUT_STREAM_RUNNING;
228                 return 0;
229
230         case STDOUT_STREAM_RUNNING:
231                 return stdout_stream_log(s, p);
232         }
233
234         assert_not_reached("Unknown stream state");
235 }
236
237 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
238         char *p;
239         size_t remaining;
240         int r;
241
242         assert(s);
243
244         p = s->buffer;
245         remaining = s->length;
246         for (;;) {
247                 char *end;
248                 size_t skip;
249
250                 end = memchr(p, '\n', remaining);
251                 if (end)
252                         skip = end - p + 1;
253                 else if (remaining >= sizeof(s->buffer) - 1) {
254                         end = p + sizeof(s->buffer) - 1;
255                         skip = remaining;
256                 } else
257                         break;
258
259                 *end = 0;
260
261                 r = stdout_stream_line(s, p);
262                 if (r < 0)
263                         return r;
264
265                 remaining -= skip;
266                 p += skip;
267         }
268
269         if (force_flush && remaining > 0) {
270                 p[remaining] = 0;
271                 r = stdout_stream_line(s, p);
272                 if (r < 0)
273                         return r;
274
275                 p += remaining;
276                 remaining = 0;
277         }
278
279         if (p > s->buffer) {
280                 memmove(s->buffer, p, remaining);
281                 s->length = remaining;
282         }
283
284         return 0;
285 }
286
287 static int stdout_stream_process(sd_event_source *es, int fd, uint32_t revents, void *userdata) {
288         StdoutStream *s = userdata;
289         ssize_t l;
290         int r;
291
292         assert(s);
293
294         if ((revents|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) {
295                 log_error("Got invalid event from epoll for stdout stream: %"PRIx32, revents);
296                 goto terminate;
297         }
298
299         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
300         if (l < 0) {
301
302                 if (errno == EAGAIN)
303                         return 0;
304
305                 log_warning("Failed to read from stream: %m");
306                 goto terminate;
307         }
308
309         if (l == 0) {
310                 stdout_stream_scan(s, true);
311                 goto terminate;
312         }
313
314         s->length += l;
315         r = stdout_stream_scan(s, false);
316         if (r < 0)
317                 goto terminate;
318
319         return 1;
320
321 terminate:
322         stdout_stream_free(s);
323         return 0;
324 }
325
326 void stdout_stream_free(StdoutStream *s) {
327         assert(s);
328
329         if (s->server) {
330                 assert(s->server->n_stdout_streams > 0);
331                 s->server->n_stdout_streams --;
332                 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
333         }
334
335         if (s->event_source) {
336                 sd_event_source_set_enabled(s->event_source, SD_EVENT_OFF);
337                 s->event_source = sd_event_source_unref(s->event_source);
338         }
339
340         if (s->fd >= 0)
341                 close_nointr_nofail(s->fd);
342
343 #ifdef HAVE_SELINUX
344         if (s->security_context)
345                 freecon(s->security_context);
346 #endif
347
348         free(s->identifier);
349         free(s->unit_id);
350         free(s);
351 }
352
353 static int stdout_stream_new(sd_event_source *es, int listen_fd, uint32_t revents, void *userdata) {
354         Server *s = userdata;
355         StdoutStream *stream;
356         int fd, r;
357
358         assert(s);
359
360         if (revents != EPOLLIN) {
361                 log_error("Got invalid event from epoll for stdout server fd: %"PRIx32, revents);
362                 return -EIO;
363         }
364
365         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
366         if (fd < 0) {
367                 if (errno == EAGAIN)
368                         return 0;
369
370                 log_error("Failed to accept stdout connection: %m");
371                 return -errno;
372         }
373
374         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
375                 log_warning("Too many stdout streams, refusing connection.");
376                 close_nointr_nofail(fd);
377                 return 0;
378         }
379
380         stream = new0(StdoutStream, 1);
381         if (!stream) {
382                 close_nointr_nofail(fd);
383                 return log_oom();
384         }
385
386         stream->fd = fd;
387
388         r = getpeercred(fd, &stream->ucred);
389         if (r < 0) {
390                 log_error("Failed to determine peer credentials: %m");
391                 goto fail;
392         }
393
394 #ifdef HAVE_SELINUX
395         if (use_selinux()) {
396                 if (getpeercon(fd, &stream->security_context) < 0 && errno != ENOPROTOOPT)
397                         log_error("Failed to determine peer security context: %m");
398         }
399 #endif
400
401         if (shutdown(fd, SHUT_WR) < 0) {
402                 log_error("Failed to shutdown writing side of socket: %m");
403                 goto fail;
404         }
405
406         r = sd_event_add_io(s->event, &stream->event_source, fd, EPOLLIN, stdout_stream_process, stream);
407         if (r < 0) {
408                 log_error("Failed to add stream to event loop: %s", strerror(-r));
409                 goto fail;
410         }
411
412         r = sd_event_source_set_priority(stream->event_source, SD_EVENT_PRIORITY_NORMAL+5);
413         if (r < 0) {
414                 log_error("Failed to adjust stdout event source priority: %s", strerror(-r));
415                 goto fail;
416         }
417
418         stream->server = s;
419         LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
420         s->n_stdout_streams ++;
421
422         return 0;
423
424 fail:
425         stdout_stream_free(stream);
426         return 0;
427 }
428
429 int server_open_stdout_socket(Server *s) {
430         int r;
431
432         assert(s);
433
434         if (s->stdout_fd < 0) {
435                 union sockaddr_union sa = {
436                         .un.sun_family = AF_UNIX,
437                         .un.sun_path = "/run/systemd/journal/stdout",
438                 };
439
440                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
441                 if (s->stdout_fd < 0) {
442                         log_error("socket() failed: %m");
443                         return -errno;
444                 }
445
446                 unlink(sa.un.sun_path);
447
448                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
449                 if (r < 0) {
450                         log_error("bind() failed: %m");
451                         return -errno;
452                 }
453
454                 chmod(sa.un.sun_path, 0666);
455
456                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
457                         log_error("listen() failed: %m");
458                         return -errno;
459                 }
460         } else
461                 fd_nonblock(s->stdout_fd, 1);
462
463         r = sd_event_add_io(s->event, &s->stdout_event_source, s->stdout_fd, EPOLLIN, stdout_stream_new, s);
464         if (r < 0) {
465                 log_error("Failed to add stdout server fd to event source: %s", strerror(-r));
466                 return r;
467         }
468
469         r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
470         if (r < 0) {
471                 log_error("Failed to adjust priority of stdout server event source: %s", strerror(-r));
472                 return r;
473         }
474
475         return 0;
476 }