chiark / gitweb /
journal-remote: allow splitting incoming logs by source host
[elogind.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 safe_close(source->fd);
34         }
35         free(source->name);
36         free(source->buf);
37         iovw_free_contents(&source->iovw);
38
39         sd_event_source_unref(source->event);
40
41         free(source);
42 }
43
44 static int get_line(RemoteSource *source, char **line, size_t *size) {
45         ssize_t n, remain;
46         char *c = NULL;
47         char *newbuf = NULL;
48         size_t newsize = 0;
49
50         assert(source);
51         assert(source->state == STATE_LINE);
52         assert(source->filled <= source->size);
53         assert(source->buf == NULL || source->size > 0);
54
55         while (true) {
56                 if (source->buf)
57                         c = memchr(source->buf + source->scanned, '\n',
58                                    source->filled - source->scanned);
59                 if (c != NULL)
60                         break;
61
62                 source->scanned = source->filled;
63                 if (source->scanned >= DATA_SIZE_MAX) {
64                         log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
65                         return -E2BIG;
66                 }
67
68                 if (source->fd < 0)
69                         /* we have to wait for some data to come to us */
70                         return -EWOULDBLOCK;
71
72                 if (source->size - source->filled < LINE_CHUNK &&
73                     !GREEDY_REALLOC(source->buf, source->size,
74                                     MAX(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
75                                 return log_oom();
76
77                 assert(source->size - source->filled >= LINE_CHUNK);
78
79                 n = read(source->fd, source->buf + source->filled,
80                          MAX(source->size, DATA_SIZE_MAX) - source->filled);
81                 if (n < 0) {
82                         if (errno != EAGAIN && errno != EWOULDBLOCK)
83                                 log_error("read(%d, ..., %zd): %m", source->fd,
84                                           source->size - source->filled);
85                         return -errno;
86                 } else if (n == 0)
87                         return 0;
88
89                 source->filled += n;
90         }
91
92         *line = source->buf;
93         *size = c + 1 - source->buf;
94
95         /* Check if something remains */
96         remain = source->buf + source->filled - c - 1;
97         assert(remain >= 0);
98         if (remain) {
99                 newsize = MAX(remain, LINE_CHUNK);
100                 newbuf = malloc(newsize);
101                 if (!newbuf)
102                         return log_oom();
103                 memcpy(newbuf, c + 1, remain);
104         }
105         source->buf = newbuf;
106         source->size = newsize;
107         source->filled = remain;
108         source->scanned = 0;
109
110         return 1;
111 }
112
113 int push_data(RemoteSource *source, const char *data, size_t size) {
114         assert(source);
115         assert(source->state != STATE_EOF);
116
117         if (!GREEDY_REALLOC(source->buf, source->size,
118                             source->filled + size)) {
119                 log_error("Failed to store received data of size %zu "
120                           "(in addition to existing %zu bytes with %zu filled): %s",
121                           size, source->size, source->filled, strerror(ENOMEM));
122                 return -ENOMEM;
123         }
124
125         memcpy(source->buf + source->filled, data, size);
126         source->filled += size;
127
128         return 0;
129 }
130
131 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
132         int n;
133         char *newbuf = NULL;
134         size_t newsize = 0, remain;
135
136         assert(source);
137         assert(source->state == STATE_DATA_START ||
138                source->state == STATE_DATA ||
139                source->state == STATE_DATA_FINISH);
140         assert(size <= DATA_SIZE_MAX);
141         assert(source->filled <= source->size);
142         assert(source->scanned <= source->filled);
143         assert(source->buf != NULL || source->size == 0);
144         assert(source->buf == NULL || source->size > 0);
145         assert(data);
146
147         while(source->filled < size) {
148                 if (source->fd < 0)
149                         /* we have to wait for some data to come to us */
150                         return -EWOULDBLOCK;
151
152                 if (!GREEDY_REALLOC(source->buf, source->size, size))
153                         return log_oom();
154
155                 n = read(source->fd, source->buf + source->filled,
156                          source->size - source->filled);
157                 if (n < 0) {
158                         if (errno != EAGAIN && errno != EWOULDBLOCK)
159                                 log_error("read(%d, ..., %zd): %m", source->fd,
160                                           source->size - source->filled);
161                         return -errno;
162                 } else if (n == 0)
163                         return 0;
164
165                 source->filled += n;
166         }
167
168         *data = source->buf;
169
170         /* Check if something remains */
171         assert(size <= source->filled);
172         remain = source->filled - size;
173         if (remain) {
174                 newsize = MAX(remain, LINE_CHUNK);
175                 newbuf = malloc(newsize);
176                 if (!newbuf)
177                         return log_oom();
178                 memcpy(newbuf, source->buf + size, remain);
179         }
180         source->buf = newbuf;
181         source->size = newsize;
182         source->filled = remain;
183         source->scanned = 0;
184
185         return 1;
186 }
187
188 static int get_data_size(RemoteSource *source) {
189         int r;
190         _cleanup_free_ void *data = NULL;
191
192         assert(source);
193         assert(source->state == STATE_DATA_START);
194         assert(source->data_size == 0);
195
196         r = fill_fixed_size(source, &data, sizeof(uint64_t));
197         if (r <= 0)
198                 return r;
199
200         source->data_size = le64toh( *(uint64_t *) data );
201         if (source->data_size > DATA_SIZE_MAX) {
202                 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
203                           source->data_size, DATA_SIZE_MAX);
204                 return -EINVAL;
205         }
206         if (source->data_size == 0)
207                 log_warning("Binary field with zero length");
208
209         return 1;
210 }
211
212 static int get_data_data(RemoteSource *source, void **data) {
213         int r;
214
215         assert(source);
216         assert(data);
217         assert(source->state == STATE_DATA);
218
219         r = fill_fixed_size(source, data, source->data_size);
220         if (r <= 0)
221                 return r;
222
223         return 1;
224 }
225
226 static int get_data_newline(RemoteSource *source) {
227         int r;
228         _cleanup_free_ char *data = NULL;
229
230         assert(source);
231         assert(source->state == STATE_DATA_FINISH);
232
233         r = fill_fixed_size(source, (void**) &data, 1);
234         if (r <= 0)
235                 return r;
236
237         assert(data);
238         if (*data != '\n') {
239                 log_error("expected newline, got '%c'", *data);
240                 return -EINVAL;
241         }
242
243         return 1;
244 }
245
246 static int process_dunder(RemoteSource *source, char *line, size_t n) {
247         const char *timestamp;
248         int r;
249
250         assert(line);
251         assert(n > 0);
252         assert(line[n-1] == '\n');
253
254         /* XXX: is it worth to support timestamps in extended format?
255          * We don't produce them, but who knows... */
256
257         timestamp = startswith(line, "__CURSOR=");
258         if (timestamp)
259                 /* ignore __CURSOR */
260                 return 1;
261
262         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
263         if (timestamp) {
264                 long long unsigned x;
265                 line[n-1] = '\0';
266                 r = safe_atollu(timestamp, &x);
267                 if (r < 0)
268                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
269                 else
270                         source->ts.realtime = x;
271                 return r < 0 ? r : 1;
272         }
273
274         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
275         if (timestamp) {
276                 long long unsigned x;
277                 line[n-1] = '\0';
278                 r = safe_atollu(timestamp, &x);
279                 if (r < 0)
280                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
281                 else
282                         source->ts.monotonic = x;
283                 return r < 0 ? r : 1;
284         }
285
286         timestamp = startswith(line, "__");
287         if (timestamp) {
288                 log_notice("Unknown dunder line %s", line);
289                 return 1;
290         }
291
292         /* no dunder */
293         return 0;
294 }
295
296 int process_data(RemoteSource *source) {
297         int r;
298
299         switch(source->state) {
300         case STATE_LINE: {
301                 char *line, *sep;
302                 size_t n;
303
304                 assert(source->data_size == 0);
305
306                 r = get_line(source, &line, &n);
307                 if (r < 0)
308                         return r;
309                 if (r == 0) {
310                         source->state = STATE_EOF;
311                         return r;
312                 }
313                 assert(n > 0);
314                 assert(line[n-1] == '\n');
315
316                 if (n == 1) {
317                         log_debug("Received empty line, event is ready");
318                         free(line);
319                         return 1;
320                 }
321
322                 r = process_dunder(source, line, n);
323                 if (r != 0) {
324                         free(line);
325                         return r < 0 ? r : 0;
326                 }
327
328                 /* MESSAGE=xxx\n
329                    or
330                    COREDUMP\n
331                    LLLLLLLL0011223344...\n
332                 */
333                 sep = memchr(line, '=', n);
334                 if (sep)
335                         /* chomp newline */
336                         n--;
337                 else
338                         /* replace \n with = */
339                         line[n-1] = '=';
340                 log_debug("Received: %.*s", (int) n, line);
341
342                 r = iovw_put(&source->iovw, line, n);
343                 if (r < 0) {
344                         log_error("Failed to put line in iovect");
345                         free(line);
346                         return r;
347                 }
348
349                 if (!sep)
350                         source->state = STATE_DATA_START;
351                 return 0; /* continue */
352         }
353
354         case STATE_DATA_START:
355                 assert(source->data_size == 0);
356
357                 r = get_data_size(source);
358                 log_debug("get_data_size() -> %d", r);
359                 if (r < 0)
360                         return r;
361                 if (r == 0) {
362                         source->state = STATE_EOF;
363                         return 0;
364                 }
365
366                 source->state = source->data_size > 0 ?
367                         STATE_DATA : STATE_DATA_FINISH;
368
369                 return 0; /* continue */
370
371         case STATE_DATA: {
372                 void *data;
373
374                 assert(source->data_size > 0);
375
376                 r = get_data_data(source, &data);
377                 log_debug("get_data_data() -> %d", r);
378                 if (r < 0)
379                         return r;
380                 if (r == 0) {
381                         source->state = STATE_EOF;
382                         return 0;
383                 }
384
385                 assert(data);
386
387                 r = iovw_put(&source->iovw, data, source->data_size);
388                 if (r < 0) {
389                         log_error("failed to put binary buffer in iovect");
390                         return r;
391                 }
392
393                 source->state = STATE_DATA_FINISH;
394
395                 return 0; /* continue */
396         }
397
398         case STATE_DATA_FINISH:
399                 r = get_data_newline(source);
400                 log_debug("get_data_newline() -> %d", r);
401                 if (r < 0)
402                         return r;
403                 if (r == 0) {
404                         source->state = STATE_EOF;
405                         return 0;
406                 }
407
408                 source->data_size = 0;
409                 source->state = STATE_LINE;
410
411                 return 0; /* continue */
412         default:
413                 assert_not_reached("wtf?");
414         }
415 }
416
417 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
418         int r;
419
420         assert(source);
421         assert(writer);
422
423         r = process_data(source);
424         if (r <= 0)
425                 return r;
426
427         /* We have a full event */
428         log_info("Received a full event from source@%p fd:%d (%s)",
429                  source, source->fd, source->name);
430
431         if (!source->iovw.count) {
432                 log_warning("Entry with no payload, skipping");
433                 goto freeing;
434         }
435
436         assert(source->iovw.iovec);
437         assert(source->iovw.count);
438
439         r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
440         if (r < 0)
441                 log_error("Failed to write entry of %zu bytes: %s",
442                           iovw_size(&source->iovw), strerror(-r));
443         else
444                 r = 1;
445
446  freeing:
447         iovw_free_contents(&source->iovw);
448         return r;
449 }