chiark / gitweb /
Remove src/journal
[elogind.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0 && !source->passive_fd) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 safe_close(source->fd);
34         }
35
36         free(source->name);
37         free(source->buf);
38         iovw_free_contents(&source->iovw);
39
40         log_debug("Writer ref count %i", source->writer->n_ref);
41         writer_unref(source->writer);
42
43         sd_event_source_unref(source->event);
44         sd_event_source_unref(source->buffer_event);
45
46         free(source);
47 }
48
49 /**
50  * Initialize zero-filled source with given values. On success, takes
51  * ownerhship of fd and writer, otherwise does not touch them.
52  */
53 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
54
55         RemoteSource *source;
56
57         log_debug("Creating source for %sfd:%d (%s)",
58                   passive_fd ? "passive " : "", fd, name);
59
60         assert(fd >= 0);
61
62         source = new0(RemoteSource, 1);
63         if (!source)
64                 return NULL;
65
66         source->fd = fd;
67         source->passive_fd = passive_fd;
68         source->name = name;
69         source->writer = writer;
70
71         return source;
72 }
73
74 static char* realloc_buffer(RemoteSource *source, size_t size) {
75         char *b, *old = source->buf;
76
77         b = GREEDY_REALLOC(source->buf, source->size, size);
78         if (!b)
79                 return NULL;
80
81         iovw_rebase(&source->iovw, old, source->buf);
82
83         return b;
84 }
85
86 static int get_line(RemoteSource *source, char **line, size_t *size) {
87         ssize_t n;
88         char *c = NULL;
89
90         assert(source);
91         assert(source->state == STATE_LINE);
92         assert(source->offset <= source->filled);
93         assert(source->filled <= source->size);
94         assert(source->buf == NULL || source->size > 0);
95         assert(source->fd >= 0);
96
97         while (true) {
98                 if (source->buf) {
99                         size_t start = MAX(source->scanned, source->offset);
100
101                         c = memchr(source->buf + start, '\n',
102                                    source->filled - start);
103                         if (c != NULL)
104                                 break;
105                 }
106
107                 source->scanned = source->filled;
108                 if (source->scanned >= DATA_SIZE_MAX) {
109                         log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
110                         return -E2BIG;
111                 }
112
113                 if (source->passive_fd)
114                         /* we have to wait for some data to come to us */
115                         return -EAGAIN;
116
117                 /* We know that source->filled is at most DATA_SIZE_MAX, so if
118                    we reallocate it, we'll increase the size at least a bit. */
119                 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
120                 if (source->size - source->filled < LINE_CHUNK &&
121                     !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
122                                 return log_oom();
123
124                 assert(source->buf);
125                 assert(source->size - source->filled >= LINE_CHUNK ||
126                        source->size == ENTRY_SIZE_MAX);
127
128                 n = read(source->fd,
129                          source->buf + source->filled,
130                          source->size - source->filled);
131                 if (n < 0) {
132                         if (errno != EAGAIN)
133                                 log_error_errno(errno, "read(%d, ..., %zu): %m",
134                                                 source->fd,
135                                                 source->size - source->filled);
136                         return -errno;
137                 } else if (n == 0)
138                         return 0;
139
140                 source->filled += n;
141         }
142
143         *line = source->buf + source->offset;
144         *size = c + 1 - source->buf - source->offset;
145         source->offset += *size;
146
147         return 1;
148 }
149
150 int push_data(RemoteSource *source, const char *data, size_t size) {
151         assert(source);
152         assert(source->state != STATE_EOF);
153
154         if (!realloc_buffer(source, source->filled + size)) {
155                 log_error("Failed to store received data of size %zu "
156                           "(in addition to existing %zu bytes with %zu filled): %s",
157                           size, source->size, source->filled, strerror(ENOMEM));
158                 return -ENOMEM;
159         }
160
161         memcpy(source->buf + source->filled, data, size);
162         source->filled += size;
163
164         return 0;
165 }
166
167 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
168
169         assert(source);
170         assert(source->state == STATE_DATA_START ||
171                source->state == STATE_DATA ||
172                source->state == STATE_DATA_FINISH);
173         assert(size <= DATA_SIZE_MAX);
174         assert(source->offset <= source->filled);
175         assert(source->filled <= source->size);
176         assert(source->buf != NULL || source->size == 0);
177         assert(source->buf == NULL || source->size > 0);
178         assert(source->fd >= 0);
179         assert(data);
180
181         while (source->filled - source->offset < size) {
182                 int n;
183
184                 if (source->passive_fd)
185                         /* we have to wait for some data to come to us */
186                         return -EAGAIN;
187
188                 if (!realloc_buffer(source, source->offset + size))
189                         return log_oom();
190
191                 n = read(source->fd, source->buf + source->filled,
192                          source->size - source->filled);
193                 if (n < 0) {
194                         if (errno != EAGAIN)
195                                 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
196                                                 source->size - source->filled);
197                         return -errno;
198                 } else if (n == 0)
199                         return 0;
200
201                 source->filled += n;
202         }
203
204         *data = source->buf + source->offset;
205         source->offset += size;
206
207         return 1;
208 }
209
210 static int get_data_size(RemoteSource *source) {
211         int r;
212         void *data;
213
214         assert(source);
215         assert(source->state == STATE_DATA_START);
216         assert(source->data_size == 0);
217
218         r = fill_fixed_size(source, &data, sizeof(uint64_t));
219         if (r <= 0)
220                 return r;
221
222         source->data_size = le64toh( *(uint64_t *) data );
223         if (source->data_size > DATA_SIZE_MAX) {
224                 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
225                           source->data_size, DATA_SIZE_MAX);
226                 return -EINVAL;
227         }
228         if (source->data_size == 0)
229                 log_warning("Binary field with zero length");
230
231         return 1;
232 }
233
234 static int get_data_data(RemoteSource *source, void **data) {
235         int r;
236
237         assert(source);
238         assert(data);
239         assert(source->state == STATE_DATA);
240
241         r = fill_fixed_size(source, data, source->data_size);
242         if (r <= 0)
243                 return r;
244
245         return 1;
246 }
247
248 static int get_data_newline(RemoteSource *source) {
249         int r;
250         char *data;
251
252         assert(source);
253         assert(source->state == STATE_DATA_FINISH);
254
255         r = fill_fixed_size(source, (void**) &data, 1);
256         if (r <= 0)
257                 return r;
258
259         assert(data);
260         if (*data != '\n') {
261                 log_error("expected newline, got '%c'", *data);
262                 return -EINVAL;
263         }
264
265         return 1;
266 }
267
268 static int process_dunder(RemoteSource *source, char *line, size_t n) {
269         const char *timestamp;
270         int r;
271
272         assert(line);
273         assert(n > 0);
274         assert(line[n-1] == '\n');
275
276         /* XXX: is it worth to support timestamps in extended format?
277          * We don't produce them, but who knows... */
278
279         timestamp = startswith(line, "__CURSOR=");
280         if (timestamp)
281                 /* ignore __CURSOR */
282                 return 1;
283
284         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
285         if (timestamp) {
286                 long long unsigned x;
287                 line[n-1] = '\0';
288                 r = safe_atollu(timestamp, &x);
289                 if (r < 0)
290                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
291                 else
292                         source->ts.realtime = x;
293                 return r < 0 ? r : 1;
294         }
295
296         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
297         if (timestamp) {
298                 long long unsigned x;
299                 line[n-1] = '\0';
300                 r = safe_atollu(timestamp, &x);
301                 if (r < 0)
302                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
303                 else
304                         source->ts.monotonic = x;
305                 return r < 0 ? r : 1;
306         }
307
308         timestamp = startswith(line, "__");
309         if (timestamp) {
310                 log_notice("Unknown dunder line %s", line);
311                 return 1;
312         }
313
314         /* no dunder */
315         return 0;
316 }
317
318 static int process_data(RemoteSource *source) {
319         int r;
320
321         switch(source->state) {
322         case STATE_LINE: {
323                 char *line, *sep;
324                 size_t n = 0;
325
326                 assert(source->data_size == 0);
327
328                 r = get_line(source, &line, &n);
329                 if (r < 0)
330                         return r;
331                 if (r == 0) {
332                         source->state = STATE_EOF;
333                         return r;
334                 }
335                 assert(n > 0);
336                 assert(line[n-1] == '\n');
337
338                 if (n == 1) {
339                         log_trace("Received empty line, event is ready");
340                         return 1;
341                 }
342
343                 r = process_dunder(source, line, n);
344                 if (r != 0)
345                         return r < 0 ? r : 0;
346
347                 /* MESSAGE=xxx\n
348                    or
349                    COREDUMP\n
350                    LLLLLLLL0011223344...\n
351                 */
352                 sep = memchr(line, '=', n);
353                 if (sep) {
354                         /* chomp newline */
355                         n--;
356
357                         r = iovw_put(&source->iovw, line, n);
358                         if (r < 0)
359                                 return r;
360                 } else {
361                         /* replace \n with = */
362                         line[n-1] = '=';
363
364                         source->field_len = n;
365                         source->state = STATE_DATA_START;
366
367                         /* we cannot put the field in iovec until we have all data */
368                 }
369
370                 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
371
372                 return 0; /* continue */
373         }
374
375         case STATE_DATA_START:
376                 assert(source->data_size == 0);
377
378                 r = get_data_size(source);
379                 // log_debug("get_data_size() -> %d", r);
380                 if (r < 0)
381                         return r;
382                 if (r == 0) {
383                         source->state = STATE_EOF;
384                         return 0;
385                 }
386
387                 source->state = source->data_size > 0 ?
388                         STATE_DATA : STATE_DATA_FINISH;
389
390                 return 0; /* continue */
391
392         case STATE_DATA: {
393                 void *data;
394                 char *field;
395
396                 assert(source->data_size > 0);
397
398                 r = get_data_data(source, &data);
399                 // log_debug("get_data_data() -> %d", r);
400                 if (r < 0)
401                         return r;
402                 if (r == 0) {
403                         source->state = STATE_EOF;
404                         return 0;
405                 }
406
407                 assert(data);
408
409                 field = (char*) data - sizeof(uint64_t) - source->field_len;
410                 memmove(field + sizeof(uint64_t), field, source->field_len);
411
412                 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
413                 if (r < 0)
414                         return r;
415
416                 source->state = STATE_DATA_FINISH;
417
418                 return 0; /* continue */
419         }
420
421         case STATE_DATA_FINISH:
422                 r = get_data_newline(source);
423                 // log_debug("get_data_newline() -> %d", r);
424                 if (r < 0)
425                         return r;
426                 if (r == 0) {
427                         source->state = STATE_EOF;
428                         return 0;
429                 }
430
431                 source->data_size = 0;
432                 source->state = STATE_LINE;
433
434                 return 0; /* continue */
435         default:
436                 assert_not_reached("wtf?");
437         }
438 }
439
440 int process_source(RemoteSource *source, bool compress, bool seal) {
441         size_t remain, target;
442         int r;
443
444         assert(source);
445         assert(source->writer);
446
447         r = process_data(source);
448         if (r <= 0)
449                 return r;
450
451         /* We have a full event */
452         log_trace("Received full event from source@%p fd:%d (%s)",
453                   source, source->fd, source->name);
454
455         if (!source->iovw.count) {
456                 log_warning("Entry with no payload, skipping");
457                 goto freeing;
458         }
459
460         assert(source->iovw.iovec);
461         assert(source->iovw.count);
462
463         r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
464         if (r < 0)
465                 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
466                                 iovw_size(&source->iovw));
467         else
468                 r = 1;
469
470  freeing:
471         iovw_free_contents(&source->iovw);
472
473         /* possibly reset buffer position */
474         remain = source->filled - source->offset;
475
476         if (remain == 0) /* no brainer */
477                 source->offset = source->scanned = source->filled = 0;
478         else if (source->offset > source->size - source->filled &&
479                  source->offset > remain) {
480                 memcpy(source->buf, source->buf + source->offset, remain);
481                 source->offset = source->scanned = 0;
482                 source->filled = remain;
483         }
484
485         target = source->size;
486         while (target > 16 * LINE_CHUNK && remain < target / 2)
487                 target /= 2;
488         if (target < source->size) {
489                 char *tmp;
490
491                 tmp = realloc(source->buf, target);
492                 if (!tmp)
493                         log_warning("Failed to reallocate buffer to (smaller) size %zu",
494                                     target);
495                 else {
496                         log_debug("Reallocated buffer from %zu to %zu bytes",
497                                   source->size, target);
498                         source->buf = tmp;
499                         source->size = target;
500                 }
501         }
502
503         return r;
504 }