1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 8*1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0 && !source->passive_fd) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
38 iovw_free_contents(&source->iovw);
40 log_debug("Writer ref count %i", source->writer->n_ref);
41 writer_unref(source->writer);
43 sd_event_source_unref(source->event);
44 sd_event_source_unref(source->buffer_event);
50 * Initialize zero-filled source with given values. On success, takes
51 * ownerhship of fd and writer, otherwise does not touch them.
53 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
57 log_debug("Creating source for %sfd:%d (%s)",
58 passive_fd ? "passive " : "", fd, name);
62 source = new0(RemoteSource, 1);
67 source->passive_fd = passive_fd;
69 source->writer = writer;
74 static char* realloc_buffer(RemoteSource *source, size_t size) {
75 char *b, *old = source->buf;
77 b = GREEDY_REALLOC(source->buf, source->size, size);
81 iovw_rebase(&source->iovw, old, source->buf);
86 static int get_line(RemoteSource *source, char **line, size_t *size) {
91 assert(source->state == STATE_LINE);
92 assert(source->offset <= source->filled);
93 assert(source->filled <= source->size);
94 assert(source->buf == NULL || source->size > 0);
95 assert(source->fd >= 0);
99 size_t start = MAX(source->scanned, source->offset);
101 c = memchr(source->buf + start, '\n',
102 source->filled - start);
107 source->scanned = source->filled;
108 if (source->scanned >= DATA_SIZE_MAX) {
109 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
113 if (source->passive_fd)
114 /* we have to wait for some data to come to us */
117 /* We know that source->filled is at most DATA_SIZE_MAX, so if
118 we reallocate it, we'll increase the size at least a bit. */
119 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
120 if (source->size - source->filled < LINE_CHUNK &&
121 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
125 assert(source->size - source->filled >= LINE_CHUNK ||
126 source->size == ENTRY_SIZE_MAX);
129 source->buf + source->filled,
130 source->size - source->filled);
133 log_error_errno(errno, "read(%d, ..., %zu): %m",
135 source->size - source->filled);
143 *line = source->buf + source->offset;
144 *size = c + 1 - source->buf - source->offset;
145 source->offset += *size;
150 int push_data(RemoteSource *source, const char *data, size_t size) {
152 assert(source->state != STATE_EOF);
154 if (!realloc_buffer(source, source->filled + size)) {
155 log_error("Failed to store received data of size %zu "
156 "(in addition to existing %zu bytes with %zu filled): %s",
157 size, source->size, source->filled, strerror(ENOMEM));
161 memcpy(source->buf + source->filled, data, size);
162 source->filled += size;
167 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
170 assert(source->state == STATE_DATA_START ||
171 source->state == STATE_DATA ||
172 source->state == STATE_DATA_FINISH);
173 assert(size <= DATA_SIZE_MAX);
174 assert(source->offset <= source->filled);
175 assert(source->filled <= source->size);
176 assert(source->buf != NULL || source->size == 0);
177 assert(source->buf == NULL || source->size > 0);
178 assert(source->fd >= 0);
181 while (source->filled - source->offset < size) {
184 if (source->passive_fd)
185 /* we have to wait for some data to come to us */
188 if (!realloc_buffer(source, source->offset + size))
191 n = read(source->fd, source->buf + source->filled,
192 source->size - source->filled);
195 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
196 source->size - source->filled);
204 *data = source->buf + source->offset;
205 source->offset += size;
210 static int get_data_size(RemoteSource *source) {
215 assert(source->state == STATE_DATA_START);
216 assert(source->data_size == 0);
218 r = fill_fixed_size(source, &data, sizeof(uint64_t));
222 source->data_size = le64toh( *(uint64_t *) data );
223 if (source->data_size > DATA_SIZE_MAX) {
224 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
225 source->data_size, DATA_SIZE_MAX);
228 if (source->data_size == 0)
229 log_warning("Binary field with zero length");
234 static int get_data_data(RemoteSource *source, void **data) {
239 assert(source->state == STATE_DATA);
241 r = fill_fixed_size(source, data, source->data_size);
248 static int get_data_newline(RemoteSource *source) {
253 assert(source->state == STATE_DATA_FINISH);
255 r = fill_fixed_size(source, (void**) &data, 1);
261 log_error("expected newline, got '%c'", *data);
268 static int process_dunder(RemoteSource *source, char *line, size_t n) {
269 const char *timestamp;
274 assert(line[n-1] == '\n');
276 /* XXX: is it worth to support timestamps in extended format?
277 * We don't produce them, but who knows... */
279 timestamp = startswith(line, "__CURSOR=");
281 /* ignore __CURSOR */
284 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
286 long long unsigned x;
288 r = safe_atollu(timestamp, &x);
290 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
292 source->ts.realtime = x;
293 return r < 0 ? r : 1;
296 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
298 long long unsigned x;
300 r = safe_atollu(timestamp, &x);
302 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
304 source->ts.monotonic = x;
305 return r < 0 ? r : 1;
308 timestamp = startswith(line, "__");
310 log_notice("Unknown dunder line %s", line);
318 static int process_data(RemoteSource *source) {
321 switch(source->state) {
326 assert(source->data_size == 0);
328 r = get_line(source, &line, &n);
332 source->state = STATE_EOF;
336 assert(line[n-1] == '\n');
339 log_trace("Received empty line, event is ready");
343 r = process_dunder(source, line, n);
345 return r < 0 ? r : 0;
350 LLLLLLLL0011223344...\n
352 sep = memchr(line, '=', n);
357 r = iovw_put(&source->iovw, line, n);
361 /* replace \n with = */
364 source->field_len = n;
365 source->state = STATE_DATA_START;
367 /* we cannot put the field in iovec until we have all data */
370 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
372 return 0; /* continue */
375 case STATE_DATA_START:
376 assert(source->data_size == 0);
378 r = get_data_size(source);
379 // log_debug("get_data_size() -> %d", r);
383 source->state = STATE_EOF;
387 source->state = source->data_size > 0 ?
388 STATE_DATA : STATE_DATA_FINISH;
390 return 0; /* continue */
396 assert(source->data_size > 0);
398 r = get_data_data(source, &data);
399 // log_debug("get_data_data() -> %d", r);
403 source->state = STATE_EOF;
409 field = (char*) data - sizeof(uint64_t) - source->field_len;
410 memmove(field + sizeof(uint64_t), field, source->field_len);
412 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
416 source->state = STATE_DATA_FINISH;
418 return 0; /* continue */
421 case STATE_DATA_FINISH:
422 r = get_data_newline(source);
423 // log_debug("get_data_newline() -> %d", r);
427 source->state = STATE_EOF;
431 source->data_size = 0;
432 source->state = STATE_LINE;
434 return 0; /* continue */
436 assert_not_reached("wtf?");
440 int process_source(RemoteSource *source, bool compress, bool seal) {
441 size_t remain, target;
445 assert(source->writer);
447 r = process_data(source);
451 /* We have a full event */
452 log_trace("Received full event from source@%p fd:%d (%s)",
453 source, source->fd, source->name);
455 if (!source->iovw.count) {
456 log_warning("Entry with no payload, skipping");
460 assert(source->iovw.iovec);
461 assert(source->iovw.count);
463 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
465 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
466 iovw_size(&source->iovw));
471 iovw_free_contents(&source->iovw);
473 /* possibly reset buffer position */
474 remain = source->filled - source->offset;
476 if (remain == 0) /* no brainer */
477 source->offset = source->scanned = source->filled = 0;
478 else if (source->offset > source->size - source->filled &&
479 source->offset > remain) {
480 memcpy(source->buf, source->buf + source->offset, remain);
481 source->offset = source->scanned = 0;
482 source->filled = remain;
485 target = source->size;
486 while (target > 16 * LINE_CHUNK && remain < target / 2)
488 if (target < source->size) {
491 tmp = realloc(source->buf, target);
493 log_warning("Failed to reallocate buffer to (smaller) size %zu",
496 log_debug("Reallocated buffer from %zu to %zu bytes",
497 source->size, target);
499 source->size = target;