chiark / gitweb /
journald: malloc less when streaming messages
[elogind.git] / src / journal / journald-stream.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2011 Lennart Poettering
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <fcntl.h>
23 #include <unistd.h>
24 #include <stddef.h>
25 #include <sys/epoll.h>
26
27 #ifdef HAVE_SELINUX
28 #include <selinux/selinux.h>
29 #endif
30
31 #include "socket-util.h"
32 #include "selinux-util.h"
33 #include "journald-server.h"
34 #include "journald-stream.h"
35 #include "journald-syslog.h"
36 #include "journald-kmsg.h"
37 #include "journald-console.h"
38
39 #define STDOUT_STREAMS_MAX 4096
40
41 typedef enum StdoutStreamState {
42         STDOUT_STREAM_IDENTIFIER,
43         STDOUT_STREAM_UNIT_ID,
44         STDOUT_STREAM_PRIORITY,
45         STDOUT_STREAM_LEVEL_PREFIX,
46         STDOUT_STREAM_FORWARD_TO_SYSLOG,
47         STDOUT_STREAM_FORWARD_TO_KMSG,
48         STDOUT_STREAM_FORWARD_TO_CONSOLE,
49         STDOUT_STREAM_RUNNING
50 } StdoutStreamState;
51
52 struct StdoutStream {
53         Server *server;
54         StdoutStreamState state;
55
56         int fd;
57
58         struct ucred ucred;
59 #ifdef HAVE_SELINUX
60         security_context_t security_context;
61 #endif
62
63         char *identifier;
64         char *unit_id;
65         int priority;
66         bool level_prefix:1;
67         bool forward_to_syslog:1;
68         bool forward_to_kmsg:1;
69         bool forward_to_console:1;
70
71         char buffer[LINE_MAX+1];
72         size_t length;
73
74         LIST_FIELDS(StdoutStream, stdout_stream);
75 };
76
77 static int stdout_stream_log(StdoutStream *s, const char *p) {
78         struct iovec iovec[N_IOVEC_META_FIELDS + 5];
79         int priority;
80         char syslog_priority[] = "PRIORITY=\0";
81         char syslog_facility[sizeof("SYSLOG_FACILITY=") + DECIMAL_STR_MAX(priority)];
82         _cleanup_free_ char *message = NULL, *syslog_identifier = NULL;
83         unsigned n = 0;
84         char *label = NULL;
85         size_t label_len = 0;
86
87         assert(s);
88         assert(p);
89
90         if (isempty(p))
91                 return 0;
92
93         priority = s->priority;
94
95         if (s->level_prefix)
96                 syslog_parse_priority(&p, &priority, false);
97
98         if (s->forward_to_syslog || s->server->forward_to_syslog)
99                 server_forward_syslog(s->server, syslog_fixup_facility(priority), s->identifier, p, &s->ucred, NULL);
100
101         if (s->forward_to_kmsg || s->server->forward_to_kmsg)
102                 server_forward_kmsg(s->server, priority, s->identifier, p, &s->ucred);
103
104         if (s->forward_to_console || s->server->forward_to_console)
105                 server_forward_console(s->server, priority, s->identifier, p, &s->ucred);
106
107         IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout");
108
109         syslog_priority[strlen("PRIORITY=")] = '0' + LOG_PRI(priority);
110         IOVEC_SET_STRING(iovec[n++], syslog_priority);
111
112         if (priority & LOG_FACMASK) {
113                 snprintf(syslog_facility, sizeof(syslog_facility), "SYSLOG_FACILITY=%i", LOG_FAC(priority));
114                 IOVEC_SET_STRING(iovec[n++], syslog_facility);
115         }
116
117         if (s->identifier) {
118                 syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier);
119                 if (syslog_identifier)
120                         IOVEC_SET_STRING(iovec[n++], syslog_identifier);
121         }
122
123         message = strappend("MESSAGE=", p);
124         if (message)
125                 IOVEC_SET_STRING(iovec[n++], message);
126
127 #ifdef HAVE_SELINUX
128         if (s->security_context) {
129                 label = (char*) s->security_context;
130                 label_len = strlen((char*) s->security_context);
131         }
132 #endif
133
134         server_dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, label, label_len, s->unit_id, priority, 0);
135         return 0;
136 }
137
138 static int stdout_stream_line(StdoutStream *s, char *p) {
139         int r;
140
141         assert(s);
142         assert(p);
143
144         p = strstrip(p);
145
146         switch (s->state) {
147
148         case STDOUT_STREAM_IDENTIFIER:
149                 if (isempty(p))
150                         s->identifier = NULL;
151                 else  {
152                         s->identifier = strdup(p);
153                         if (!s->identifier)
154                                 return log_oom();
155                 }
156
157                 s->state = STDOUT_STREAM_UNIT_ID;
158                 return 0;
159
160         case STDOUT_STREAM_UNIT_ID:
161                 if (s->ucred.uid == 0) {
162                         if (isempty(p))
163                                 s->unit_id = NULL;
164                         else  {
165                                 s->unit_id = strdup(p);
166                                 if (!s->unit_id)
167                                         return log_oom();
168                         }
169                 }
170
171                 s->state = STDOUT_STREAM_PRIORITY;
172                 return 0;
173
174         case STDOUT_STREAM_PRIORITY:
175                 r = safe_atoi(p, &s->priority);
176                 if (r < 0 || s->priority < 0 || s->priority > 999) {
177                         log_warning("Failed to parse log priority line.");
178                         return -EINVAL;
179                 }
180
181                 s->state = STDOUT_STREAM_LEVEL_PREFIX;
182                 return 0;
183
184         case STDOUT_STREAM_LEVEL_PREFIX:
185                 r = parse_boolean(p);
186                 if (r < 0) {
187                         log_warning("Failed to parse level prefix line.");
188                         return -EINVAL;
189                 }
190
191                 s->level_prefix = !!r;
192                 s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG;
193                 return 0;
194
195         case STDOUT_STREAM_FORWARD_TO_SYSLOG:
196                 r = parse_boolean(p);
197                 if (r < 0) {
198                         log_warning("Failed to parse forward to syslog line.");
199                         return -EINVAL;
200                 }
201
202                 s->forward_to_syslog = !!r;
203                 s->state = STDOUT_STREAM_FORWARD_TO_KMSG;
204                 return 0;
205
206         case STDOUT_STREAM_FORWARD_TO_KMSG:
207                 r = parse_boolean(p);
208                 if (r < 0) {
209                         log_warning("Failed to parse copy to kmsg line.");
210                         return -EINVAL;
211                 }
212
213                 s->forward_to_kmsg = !!r;
214                 s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE;
215                 return 0;
216
217         case STDOUT_STREAM_FORWARD_TO_CONSOLE:
218                 r = parse_boolean(p);
219                 if (r < 0) {
220                         log_warning("Failed to parse copy to console line.");
221                         return -EINVAL;
222                 }
223
224                 s->forward_to_console = !!r;
225                 s->state = STDOUT_STREAM_RUNNING;
226                 return 0;
227
228         case STDOUT_STREAM_RUNNING:
229                 return stdout_stream_log(s, p);
230         }
231
232         assert_not_reached("Unknown stream state");
233 }
234
235 static int stdout_stream_scan(StdoutStream *s, bool force_flush) {
236         char *p;
237         size_t remaining;
238         int r;
239
240         assert(s);
241
242         p = s->buffer;
243         remaining = s->length;
244         for (;;) {
245                 char *end;
246                 size_t skip;
247
248                 end = memchr(p, '\n', remaining);
249                 if (end)
250                         skip = end - p + 1;
251                 else if (remaining >= sizeof(s->buffer) - 1) {
252                         end = p + sizeof(s->buffer) - 1;
253                         skip = remaining;
254                 } else
255                         break;
256
257                 *end = 0;
258
259                 r = stdout_stream_line(s, p);
260                 if (r < 0)
261                         return r;
262
263                 remaining -= skip;
264                 p += skip;
265         }
266
267         if (force_flush && remaining > 0) {
268                 p[remaining] = 0;
269                 r = stdout_stream_line(s, p);
270                 if (r < 0)
271                         return r;
272
273                 p += remaining;
274                 remaining = 0;
275         }
276
277         if (p > s->buffer) {
278                 memmove(s->buffer, p, remaining);
279                 s->length = remaining;
280         }
281
282         return 0;
283 }
284
285 int stdout_stream_process(StdoutStream *s) {
286         ssize_t l;
287         int r;
288
289         assert(s);
290
291         l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length);
292         if (l < 0) {
293
294                 if (errno == EAGAIN)
295                         return 0;
296
297                 log_warning("Failed to read from stream: %m");
298                 return -errno;
299         }
300
301         if (l == 0) {
302                 r = stdout_stream_scan(s, true);
303                 if (r < 0)
304                         return r;
305
306                 return 0;
307         }
308
309         s->length += l;
310         r = stdout_stream_scan(s, false);
311         if (r < 0)
312                 return r;
313
314         return 1;
315
316 }
317
318 void stdout_stream_free(StdoutStream *s) {
319         assert(s);
320
321         if (s->server) {
322                 assert(s->server->n_stdout_streams > 0);
323                 s->server->n_stdout_streams --;
324                 LIST_REMOVE(stdout_stream, s->server->stdout_streams, s);
325         }
326
327         if (s->fd >= 0) {
328                 if (s->server)
329                         epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL);
330
331                 close_nointr_nofail(s->fd);
332         }
333
334 #ifdef HAVE_SELINUX
335         if (s->security_context)
336                 freecon(s->security_context);
337 #endif
338
339         free(s->identifier);
340         free(s->unit_id);
341         free(s);
342 }
343
344 int stdout_stream_new(Server *s) {
345         StdoutStream *stream;
346         int fd, r;
347         socklen_t len;
348         struct epoll_event ev;
349
350         assert(s);
351
352         fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC);
353         if (fd < 0) {
354                 if (errno == EAGAIN)
355                         return 0;
356
357                 log_error("Failed to accept stdout connection: %m");
358                 return -errno;
359         }
360
361         if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) {
362                 log_warning("Too many stdout streams, refusing connection.");
363                 close_nointr_nofail(fd);
364                 return 0;
365         }
366
367         stream = new0(StdoutStream, 1);
368         if (!stream) {
369                 close_nointr_nofail(fd);
370                 return log_oom();
371         }
372
373         stream->fd = fd;
374
375         len = sizeof(stream->ucred);
376         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) {
377                 log_error("Failed to determine peer credentials: %m");
378                 r = -errno;
379                 goto fail;
380         }
381
382 #ifdef HAVE_SELINUX
383         if (use_selinux()) {
384                 if (getpeercon(fd, &stream->security_context) < 0 && errno != ENOPROTOOPT)
385                         log_error("Failed to determine peer security context: %m");
386         }
387 #endif
388
389         if (shutdown(fd, SHUT_WR) < 0) {
390                 log_error("Failed to shutdown writing side of socket: %m");
391                 r = -errno;
392                 goto fail;
393         }
394
395         zero(ev);
396         ev.data.ptr = stream;
397         ev.events = EPOLLIN;
398         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
399                 log_error("Failed to add stream to event loop: %m");
400                 r = -errno;
401                 goto fail;
402         }
403
404         stream->server = s;
405         LIST_PREPEND(stdout_stream, s->stdout_streams, stream);
406         s->n_stdout_streams ++;
407
408         return 0;
409
410 fail:
411         stdout_stream_free(stream);
412         return r;
413 }
414
415 int server_open_stdout_socket(Server *s) {
416         int r;
417         struct epoll_event ev = { .events = EPOLLIN };
418
419         assert(s);
420
421         if (s->stdout_fd < 0) {
422                 union sockaddr_union sa = {
423                         .un.sun_family = AF_UNIX,
424                         .un.sun_path = "/run/systemd/journal/stdout",
425                 };
426
427                 s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0);
428                 if (s->stdout_fd < 0) {
429                         log_error("socket() failed: %m");
430                         return -errno;
431                 }
432
433                 unlink(sa.un.sun_path);
434
435                 r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path));
436                 if (r < 0) {
437                         log_error("bind() failed: %m");
438                         return -errno;
439                 }
440
441                 chmod(sa.un.sun_path, 0666);
442
443                 if (listen(s->stdout_fd, SOMAXCONN) < 0) {
444                         log_error("listen() failed: %m");
445                         return -errno;
446                 }
447         } else
448                 fd_nonblock(s->stdout_fd, 1);
449
450         ev.data.fd = s->stdout_fd;
451         if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) {
452                 log_error("Failed to add stdout server fd to epoll object: %m");
453                 return -errno;
454         }
455
456         return 0;
457 }