1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 8*1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0 && !source->passive_fd) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
38 iovw_free_contents(&source->iovw);
40 log_debug("Writer ref count %i", source->writer->n_ref);
41 writer_unref(source->writer);
43 sd_event_source_unref(source->event);
44 sd_event_source_unref(source->buffer_event);
50 * Initialize zero-filled source with given values. On success, takes
51 * ownerhship of fd and writer, otherwise does not touch them.
53 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
57 log_debug("Creating source for %sfd:%d (%s)",
58 passive_fd ? "passive " : "", fd, name);
62 source = new0(RemoteSource, 1);
67 source->passive_fd = passive_fd;
69 source->writer = writer;
74 static char* realloc_buffer(RemoteSource *source, size_t size) {
75 char *b, *old = source->buf;
77 b = GREEDY_REALLOC(source->buf, source->size, size);
81 iovw_rebase(&source->iovw, old, source->buf);
86 static int get_line(RemoteSource *source, char **line, size_t *size) {
91 assert(source->state == STATE_LINE);
92 assert(source->offset <= source->filled);
93 assert(source->filled <= source->size);
94 assert(source->buf == NULL || source->size > 0);
95 assert(source->fd >= 0);
99 size_t start = MAX(source->scanned, source->offset);
101 c = memchr(source->buf + start, '\n',
102 source->filled - start);
107 source->scanned = source->filled;
108 if (source->scanned >= DATA_SIZE_MAX) {
109 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
113 if (source->passive_fd)
114 /* we have to wait for some data to come to us */
117 if (source->size - source->filled < LINE_CHUNK &&
118 !realloc_buffer(source,
119 MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
122 assert(source->size - source->filled >= LINE_CHUNK ||
123 source->size == ENTRY_SIZE_MAX);
125 n = read(source->fd, source->buf + source->filled,
126 source->size - source->filled);
128 if (errno != EAGAIN && errno != EWOULDBLOCK)
129 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
130 source->size - source->filled);
138 *line = source->buf + source->offset;
139 *size = c + 1 - source->buf - source->offset;
140 source->offset += *size;
145 int push_data(RemoteSource *source, const char *data, size_t size) {
147 assert(source->state != STATE_EOF);
149 if (!realloc_buffer(source, source->filled + size)) {
150 log_error("Failed to store received data of size %zu "
151 "(in addition to existing %zu bytes with %zu filled): %s",
152 size, source->size, source->filled, strerror(ENOMEM));
156 memcpy(source->buf + source->filled, data, size);
157 source->filled += size;
162 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
165 assert(source->state == STATE_DATA_START ||
166 source->state == STATE_DATA ||
167 source->state == STATE_DATA_FINISH);
168 assert(size <= DATA_SIZE_MAX);
169 assert(source->offset <= source->filled);
170 assert(source->filled <= source->size);
171 assert(source->buf != NULL || source->size == 0);
172 assert(source->buf == NULL || source->size > 0);
173 assert(source->fd >= 0);
176 while (source->filled - source->offset < size) {
179 if (source->passive_fd)
180 /* we have to wait for some data to come to us */
183 if (!realloc_buffer(source, source->offset + size))
186 n = read(source->fd, source->buf + source->filled,
187 source->size - source->filled);
189 if (errno != EAGAIN && errno != EWOULDBLOCK)
190 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
191 source->size - source->filled);
199 *data = source->buf + source->offset;
200 source->offset += size;
205 static int get_data_size(RemoteSource *source) {
210 assert(source->state == STATE_DATA_START);
211 assert(source->data_size == 0);
213 r = fill_fixed_size(source, &data, sizeof(uint64_t));
217 source->data_size = le64toh( *(uint64_t *) data );
218 if (source->data_size > DATA_SIZE_MAX) {
219 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
220 source->data_size, DATA_SIZE_MAX);
223 if (source->data_size == 0)
224 log_warning("Binary field with zero length");
229 static int get_data_data(RemoteSource *source, void **data) {
234 assert(source->state == STATE_DATA);
236 r = fill_fixed_size(source, data, source->data_size);
243 static int get_data_newline(RemoteSource *source) {
248 assert(source->state == STATE_DATA_FINISH);
250 r = fill_fixed_size(source, (void**) &data, 1);
256 log_error("expected newline, got '%c'", *data);
263 static int process_dunder(RemoteSource *source, char *line, size_t n) {
264 const char *timestamp;
269 assert(line[n-1] == '\n');
271 /* XXX: is it worth to support timestamps in extended format?
272 * We don't produce them, but who knows... */
274 timestamp = startswith(line, "__CURSOR=");
276 /* ignore __CURSOR */
279 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
281 long long unsigned x;
283 r = safe_atollu(timestamp, &x);
285 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
287 source->ts.realtime = x;
288 return r < 0 ? r : 1;
291 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
293 long long unsigned x;
295 r = safe_atollu(timestamp, &x);
297 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
299 source->ts.monotonic = x;
300 return r < 0 ? r : 1;
303 timestamp = startswith(line, "__");
305 log_notice("Unknown dunder line %s", line);
313 int process_data(RemoteSource *source) {
316 switch(source->state) {
321 assert(source->data_size == 0);
323 r = get_line(source, &line, &n);
327 source->state = STATE_EOF;
331 assert(line[n-1] == '\n');
334 log_trace("Received empty line, event is ready");
338 r = process_dunder(source, line, n);
340 return r < 0 ? r : 0;
345 LLLLLLLL0011223344...\n
347 sep = memchr(line, '=', n);
352 r = iovw_put(&source->iovw, line, n);
356 /* replace \n with = */
359 source->field_len = n;
360 source->state = STATE_DATA_START;
362 /* we cannot put the field in iovec until we have all data */
365 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
367 return 0; /* continue */
370 case STATE_DATA_START:
371 assert(source->data_size == 0);
373 r = get_data_size(source);
374 // log_debug("get_data_size() -> %d", r);
378 source->state = STATE_EOF;
382 source->state = source->data_size > 0 ?
383 STATE_DATA : STATE_DATA_FINISH;
385 return 0; /* continue */
391 assert(source->data_size > 0);
393 r = get_data_data(source, &data);
394 // log_debug("get_data_data() -> %d", r);
398 source->state = STATE_EOF;
404 field = (char*) data - sizeof(uint64_t) - source->field_len;
405 memmove(field + sizeof(uint64_t), field, source->field_len);
407 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
411 source->state = STATE_DATA_FINISH;
413 return 0; /* continue */
416 case STATE_DATA_FINISH:
417 r = get_data_newline(source);
418 // log_debug("get_data_newline() -> %d", r);
422 source->state = STATE_EOF;
426 source->data_size = 0;
427 source->state = STATE_LINE;
429 return 0; /* continue */
431 assert_not_reached("wtf?");
435 int process_source(RemoteSource *source, bool compress, bool seal) {
436 size_t remain, target;
440 assert(source->writer);
442 r = process_data(source);
446 /* We have a full event */
447 log_trace("Received full event from source@%p fd:%d (%s)",
448 source, source->fd, source->name);
450 if (!source->iovw.count) {
451 log_warning("Entry with no payload, skipping");
455 assert(source->iovw.iovec);
456 assert(source->iovw.count);
458 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
460 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
461 iovw_size(&source->iovw));
466 iovw_free_contents(&source->iovw);
468 /* possibly reset buffer position */
469 remain = source->filled - source->offset;
471 if (remain == 0) /* no brainer */
472 source->offset = source->scanned = source->filled = 0;
473 else if (source->offset > source->size - source->filled &&
474 source->offset > remain) {
475 memcpy(source->buf, source->buf + source->offset, remain);
476 source->offset = source->scanned = 0;
477 source->filled = remain;
480 target = source->size;
481 while (target > 16 * LINE_CHUNK && remain < target / 2)
483 if (target < source->size) {
486 tmp = realloc(source->buf, target);
488 log_warning("Failed to reallocate buffer to (smaller) size %zu",
491 log_debug("Reallocated buffer from %zu to %zu bytes",
492 source->size, target);
494 source->size = target;