chiark / gitweb /
journal-remote: process events without delay
[elogind.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0 && !source->passive_fd) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 safe_close(source->fd);
34         }
35
36         free(source->name);
37         free(source->buf);
38         iovw_free_contents(&source->iovw);
39
40         log_debug("Writer ref count %i", source->writer->n_ref);
41         writer_unref(source->writer);
42
43         sd_event_source_unref(source->event);
44         sd_event_source_unref(source->buffer_event);
45
46         free(source);
47 }
48
49 /**
50  * Initialize zero-filled source with given values. On success, takes
51  * ownerhship of fd and writer, otherwise does not touch them.
52  */
53 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
54
55         RemoteSource *source;
56
57         log_debug("Creating source for %sfd:%d (%s)",
58                   passive_fd ? "passive " : "", fd, name);
59
60         assert(fd >= 0);
61
62         source = new0(RemoteSource, 1);
63         if (!source)
64                 return NULL;
65
66         source->fd = fd;
67         source->passive_fd = passive_fd;
68         source->name = name;
69         source->writer = writer;
70
71         return source;
72 }
73
74 static char* realloc_buffer(RemoteSource *source, size_t size) {
75         char *b, *old = source->buf;
76
77         b = GREEDY_REALLOC(source->buf, source->size, size);
78         if (!b)
79                 return NULL;
80
81         iovw_rebase(&source->iovw, old, source->buf);
82
83         return b;
84 }
85
86 static int get_line(RemoteSource *source, char **line, size_t *size) {
87         ssize_t n;
88         char *c = NULL;
89
90         assert(source);
91         assert(source->state == STATE_LINE);
92         assert(source->offset <= source->filled);
93         assert(source->filled <= source->size);
94         assert(source->buf == NULL || source->size > 0);
95         assert(source->fd >= 0);
96
97         while (true) {
98                 if (source->buf) {
99                         size_t start = MAX(source->scanned, source->offset);
100
101                         c = memchr(source->buf + start, '\n',
102                                    source->filled - start);
103                         if (c != NULL)
104                                 break;
105                 }
106
107                 source->scanned = source->filled;
108                 if (source->scanned >= DATA_SIZE_MAX) {
109                         log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
110                         return -E2BIG;
111                 }
112
113                 if (source->passive_fd)
114                         /* we have to wait for some data to come to us */
115                         return -EWOULDBLOCK;
116
117                 if (source->size - source->filled < LINE_CHUNK &&
118                     !realloc_buffer(source,
119                                     MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
120                                 return log_oom();
121
122                 assert(source->size - source->filled >= LINE_CHUNK ||
123                        source->size == ENTRY_SIZE_MAX);
124
125                 n = read(source->fd, source->buf + source->filled,
126                          source->size - source->filled);
127                 if (n < 0) {
128                         if (errno != EAGAIN && errno != EWOULDBLOCK)
129                                 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
130                                                 source->size - source->filled);
131                         return -errno;
132                 } else if (n == 0)
133                         return 0;
134
135                 source->filled += n;
136         }
137
138         *line = source->buf + source->offset;
139         *size = c + 1 - source->buf - source->offset;
140         source->offset += *size;
141
142         return 1;
143 }
144
145 int push_data(RemoteSource *source, const char *data, size_t size) {
146         assert(source);
147         assert(source->state != STATE_EOF);
148
149         if (!realloc_buffer(source, source->filled + size)) {
150                 log_error("Failed to store received data of size %zu "
151                           "(in addition to existing %zu bytes with %zu filled): %s",
152                           size, source->size, source->filled, strerror(ENOMEM));
153                 return -ENOMEM;
154         }
155
156         memcpy(source->buf + source->filled, data, size);
157         source->filled += size;
158
159         return 0;
160 }
161
162 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
163
164         assert(source);
165         assert(source->state == STATE_DATA_START ||
166                source->state == STATE_DATA ||
167                source->state == STATE_DATA_FINISH);
168         assert(size <= DATA_SIZE_MAX);
169         assert(source->offset <= source->filled);
170         assert(source->filled <= source->size);
171         assert(source->buf != NULL || source->size == 0);
172         assert(source->buf == NULL || source->size > 0);
173         assert(source->fd >= 0);
174         assert(data);
175
176         while (source->filled - source->offset < size) {
177                 int n;
178
179                 if (source->passive_fd)
180                         /* we have to wait for some data to come to us */
181                         return -EWOULDBLOCK;
182
183                 if (!realloc_buffer(source, source->offset + size))
184                         return log_oom();
185
186                 n = read(source->fd, source->buf + source->filled,
187                          source->size - source->filled);
188                 if (n < 0) {
189                         if (errno != EAGAIN && errno != EWOULDBLOCK)
190                                 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
191                                                 source->size - source->filled);
192                         return -errno;
193                 } else if (n == 0)
194                         return 0;
195
196                 source->filled += n;
197         }
198
199         *data = source->buf + source->offset;
200         source->offset += size;
201
202         return 1;
203 }
204
205 static int get_data_size(RemoteSource *source) {
206         int r;
207         void *data;
208
209         assert(source);
210         assert(source->state == STATE_DATA_START);
211         assert(source->data_size == 0);
212
213         r = fill_fixed_size(source, &data, sizeof(uint64_t));
214         if (r <= 0)
215                 return r;
216
217         source->data_size = le64toh( *(uint64_t *) data );
218         if (source->data_size > DATA_SIZE_MAX) {
219                 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
220                           source->data_size, DATA_SIZE_MAX);
221                 return -EINVAL;
222         }
223         if (source->data_size == 0)
224                 log_warning("Binary field with zero length");
225
226         return 1;
227 }
228
229 static int get_data_data(RemoteSource *source, void **data) {
230         int r;
231
232         assert(source);
233         assert(data);
234         assert(source->state == STATE_DATA);
235
236         r = fill_fixed_size(source, data, source->data_size);
237         if (r <= 0)
238                 return r;
239
240         return 1;
241 }
242
243 static int get_data_newline(RemoteSource *source) {
244         int r;
245         char *data;
246
247         assert(source);
248         assert(source->state == STATE_DATA_FINISH);
249
250         r = fill_fixed_size(source, (void**) &data, 1);
251         if (r <= 0)
252                 return r;
253
254         assert(data);
255         if (*data != '\n') {
256                 log_error("expected newline, got '%c'", *data);
257                 return -EINVAL;
258         }
259
260         return 1;
261 }
262
263 static int process_dunder(RemoteSource *source, char *line, size_t n) {
264         const char *timestamp;
265         int r;
266
267         assert(line);
268         assert(n > 0);
269         assert(line[n-1] == '\n');
270
271         /* XXX: is it worth to support timestamps in extended format?
272          * We don't produce them, but who knows... */
273
274         timestamp = startswith(line, "__CURSOR=");
275         if (timestamp)
276                 /* ignore __CURSOR */
277                 return 1;
278
279         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
280         if (timestamp) {
281                 long long unsigned x;
282                 line[n-1] = '\0';
283                 r = safe_atollu(timestamp, &x);
284                 if (r < 0)
285                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
286                 else
287                         source->ts.realtime = x;
288                 return r < 0 ? r : 1;
289         }
290
291         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
292         if (timestamp) {
293                 long long unsigned x;
294                 line[n-1] = '\0';
295                 r = safe_atollu(timestamp, &x);
296                 if (r < 0)
297                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
298                 else
299                         source->ts.monotonic = x;
300                 return r < 0 ? r : 1;
301         }
302
303         timestamp = startswith(line, "__");
304         if (timestamp) {
305                 log_notice("Unknown dunder line %s", line);
306                 return 1;
307         }
308
309         /* no dunder */
310         return 0;
311 }
312
313 int process_data(RemoteSource *source) {
314         int r;
315
316         switch(source->state) {
317         case STATE_LINE: {
318                 char *line, *sep;
319                 size_t n;
320
321                 assert(source->data_size == 0);
322
323                 r = get_line(source, &line, &n);
324                 if (r < 0)
325                         return r;
326                 if (r == 0) {
327                         source->state = STATE_EOF;
328                         return r;
329                 }
330                 assert(n > 0);
331                 assert(line[n-1] == '\n');
332
333                 if (n == 1) {
334                         log_trace("Received empty line, event is ready");
335                         return 1;
336                 }
337
338                 r = process_dunder(source, line, n);
339                 if (r != 0)
340                         return r < 0 ? r : 0;
341
342                 /* MESSAGE=xxx\n
343                    or
344                    COREDUMP\n
345                    LLLLLLLL0011223344...\n
346                 */
347                 sep = memchr(line, '=', n);
348                 if (sep) {
349                         /* chomp newline */
350                         n--;
351
352                         r = iovw_put(&source->iovw, line, n);
353                         if (r < 0)
354                                 return r;
355                 } else {
356                         /* replace \n with = */
357                         line[n-1] = '=';
358
359                         source->field_len = n;
360                         source->state = STATE_DATA_START;
361
362                         /* we cannot put the field in iovec until we have all data */
363                 }
364
365                 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
366
367                 return 0; /* continue */
368         }
369
370         case STATE_DATA_START:
371                 assert(source->data_size == 0);
372
373                 r = get_data_size(source);
374                 // log_debug("get_data_size() -> %d", r);
375                 if (r < 0)
376                         return r;
377                 if (r == 0) {
378                         source->state = STATE_EOF;
379                         return 0;
380                 }
381
382                 source->state = source->data_size > 0 ?
383                         STATE_DATA : STATE_DATA_FINISH;
384
385                 return 0; /* continue */
386
387         case STATE_DATA: {
388                 void *data;
389                 char *field;
390
391                 assert(source->data_size > 0);
392
393                 r = get_data_data(source, &data);
394                 // log_debug("get_data_data() -> %d", r);
395                 if (r < 0)
396                         return r;
397                 if (r == 0) {
398                         source->state = STATE_EOF;
399                         return 0;
400                 }
401
402                 assert(data);
403
404                 field = (char*) data - sizeof(uint64_t) - source->field_len;
405                 memmove(field + sizeof(uint64_t), field, source->field_len);
406
407                 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
408                 if (r < 0)
409                         return r;
410
411                 source->state = STATE_DATA_FINISH;
412
413                 return 0; /* continue */
414         }
415
416         case STATE_DATA_FINISH:
417                 r = get_data_newline(source);
418                 // log_debug("get_data_newline() -> %d", r);
419                 if (r < 0)
420                         return r;
421                 if (r == 0) {
422                         source->state = STATE_EOF;
423                         return 0;
424                 }
425
426                 source->data_size = 0;
427                 source->state = STATE_LINE;
428
429                 return 0; /* continue */
430         default:
431                 assert_not_reached("wtf?");
432         }
433 }
434
435 int process_source(RemoteSource *source, bool compress, bool seal) {
436         size_t remain, target;
437         int r;
438
439         assert(source);
440         assert(source->writer);
441
442         r = process_data(source);
443         if (r <= 0)
444                 return r;
445
446         /* We have a full event */
447         log_trace("Received full event from source@%p fd:%d (%s)",
448                   source, source->fd, source->name);
449
450         if (!source->iovw.count) {
451                 log_warning("Entry with no payload, skipping");
452                 goto freeing;
453         }
454
455         assert(source->iovw.iovec);
456         assert(source->iovw.count);
457
458         r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
459         if (r < 0)
460                 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
461                                 iovw_size(&source->iovw));
462         else
463                 r = 1;
464
465  freeing:
466         iovw_free_contents(&source->iovw);
467
468         /* possibly reset buffer position */
469         remain = source->filled - source->offset;
470
471         if (remain == 0) /* no brainer */
472                 source->offset = source->scanned = source->filled = 0;
473         else if (source->offset > source->size - source->filled &&
474                  source->offset > remain) {
475                 memcpy(source->buf, source->buf + source->offset, remain);
476                 source->offset = source->scanned = 0;
477                 source->filled = remain;
478         }
479
480         target = source->size;
481         while (target > 16 * LINE_CHUNK && remain < target / 2)
482                 target /= 2;
483         if (target < source->size) {
484                 char *tmp;
485
486                 tmp = realloc(source->buf, target);
487                 if (!tmp)
488                         log_warning("Failed to reallocate buffer to (smaller) size %zu",
489                                     target);
490                 else {
491                         log_debug("Reallocated buffer from %zu to %zu bytes",
492                                   source->size, target);
493                         source->buf = tmp;
494                         source->size = target;
495                 }
496         }
497
498         return r;
499 }