1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 8*1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0 && !source->passive_fd) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
38 iovw_free_contents(&source->iovw);
40 log_debug("Writer ref count %u", source->writer->n_ref);
41 writer_unref(source->writer);
43 sd_event_source_unref(source->event);
49 * Initialize zero-filled source with given values. On success, takes
50 * ownerhship of fd and writer, otherwise does not touch them.
52 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
56 log_debug("Creating source for %sfd:%d (%s)",
57 passive_fd ? "passive " : "", fd, name);
61 source = new0(RemoteSource, 1);
66 source->passive_fd = passive_fd;
68 source->writer = writer;
73 static char* realloc_buffer(RemoteSource *source, size_t size) {
74 char *b, *old = source->buf;
76 b = GREEDY_REALLOC(source->buf, source->size, size);
80 iovw_rebase(&source->iovw, old, source->buf);
85 static int get_line(RemoteSource *source, char **line, size_t *size) {
90 assert(source->state == STATE_LINE);
91 assert(source->offset <= source->filled);
92 assert(source->filled <= source->size);
93 assert(source->buf == NULL || source->size > 0);
94 assert(source->fd >= 0);
98 size_t start = MAX(source->scanned, source->offset);
100 c = memchr(source->buf + start, '\n',
101 source->filled - start);
106 source->scanned = source->filled;
107 if (source->scanned >= DATA_SIZE_MAX) {
108 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
112 if (source->passive_fd)
113 /* we have to wait for some data to come to us */
116 if (source->size - source->filled < LINE_CHUNK &&
117 !realloc_buffer(source,
118 MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
121 assert(source->size - source->filled >= LINE_CHUNK ||
122 source->size == ENTRY_SIZE_MAX);
124 n = read(source->fd, source->buf + source->filled,
125 source->size - source->filled);
127 if (errno != EAGAIN && errno != EWOULDBLOCK)
128 log_error_errno(errno, "read(%d, ..., %zd): %m", source->fd,
129 source->size - source->filled);
137 *line = source->buf + source->offset;
138 *size = c + 1 - source->buf - source->offset;
139 source->offset += *size;
144 int push_data(RemoteSource *source, const char *data, size_t size) {
146 assert(source->state != STATE_EOF);
148 if (!realloc_buffer(source, source->filled + size)) {
149 log_error("Failed to store received data of size %zu "
150 "(in addition to existing %zu bytes with %zu filled): %s",
151 size, source->size, source->filled, strerror(ENOMEM));
155 memcpy(source->buf + source->filled, data, size);
156 source->filled += size;
161 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
164 assert(source->state == STATE_DATA_START ||
165 source->state == STATE_DATA ||
166 source->state == STATE_DATA_FINISH);
167 assert(size <= DATA_SIZE_MAX);
168 assert(source->offset <= source->filled);
169 assert(source->filled <= source->size);
170 assert(source->buf != NULL || source->size == 0);
171 assert(source->buf == NULL || source->size > 0);
172 assert(source->fd >= 0);
175 while (source->filled - source->offset < size) {
178 if (source->passive_fd)
179 /* we have to wait for some data to come to us */
182 if (!realloc_buffer(source, source->offset + size))
185 n = read(source->fd, source->buf + source->filled,
186 source->size - source->filled);
188 if (errno != EAGAIN && errno != EWOULDBLOCK)
189 log_error_errno(errno, "read(%d, ..., %zd): %m", source->fd,
190 source->size - source->filled);
198 *data = source->buf + source->offset;
199 source->offset += size;
204 static int get_data_size(RemoteSource *source) {
209 assert(source->state == STATE_DATA_START);
210 assert(source->data_size == 0);
212 r = fill_fixed_size(source, &data, sizeof(uint64_t));
216 source->data_size = le64toh( *(uint64_t *) data );
217 if (source->data_size > DATA_SIZE_MAX) {
218 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
219 source->data_size, DATA_SIZE_MAX);
222 if (source->data_size == 0)
223 log_warning("Binary field with zero length");
228 static int get_data_data(RemoteSource *source, void **data) {
233 assert(source->state == STATE_DATA);
235 r = fill_fixed_size(source, data, source->data_size);
242 static int get_data_newline(RemoteSource *source) {
247 assert(source->state == STATE_DATA_FINISH);
249 r = fill_fixed_size(source, (void**) &data, 1);
255 log_error("expected newline, got '%c'", *data);
262 static int process_dunder(RemoteSource *source, char *line, size_t n) {
263 const char *timestamp;
268 assert(line[n-1] == '\n');
270 /* XXX: is it worth to support timestamps in extended format?
271 * We don't produce them, but who knows... */
273 timestamp = startswith(line, "__CURSOR=");
275 /* ignore __CURSOR */
278 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
280 long long unsigned x;
282 r = safe_atollu(timestamp, &x);
284 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
286 source->ts.realtime = x;
287 return r < 0 ? r : 1;
290 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
292 long long unsigned x;
294 r = safe_atollu(timestamp, &x);
296 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
298 source->ts.monotonic = x;
299 return r < 0 ? r : 1;
302 timestamp = startswith(line, "__");
304 log_notice("Unknown dunder line %s", line);
312 int process_data(RemoteSource *source) {
315 switch(source->state) {
320 assert(source->data_size == 0);
322 r = get_line(source, &line, &n);
326 source->state = STATE_EOF;
330 assert(line[n-1] == '\n');
333 log_trace("Received empty line, event is ready");
337 r = process_dunder(source, line, n);
339 return r < 0 ? r : 0;
344 LLLLLLLL0011223344...\n
346 sep = memchr(line, '=', n);
351 /* replace \n with = */
353 log_trace("Received: %.*s", (int) n, line);
355 r = iovw_put(&source->iovw, line, n);
357 log_error("Failed to put line in iovect");
362 source->state = STATE_DATA_START;
363 return 0; /* continue */
366 case STATE_DATA_START:
367 assert(source->data_size == 0);
369 r = get_data_size(source);
370 // log_debug("get_data_size() -> %d", r);
374 source->state = STATE_EOF;
378 source->state = source->data_size > 0 ?
379 STATE_DATA : STATE_DATA_FINISH;
381 return 0; /* continue */
386 assert(source->data_size > 0);
388 r = get_data_data(source, &data);
389 // log_debug("get_data_data() -> %d", r);
393 source->state = STATE_EOF;
399 r = iovw_put(&source->iovw, data, source->data_size);
401 log_error("failed to put binary buffer in iovect");
405 source->state = STATE_DATA_FINISH;
407 return 0; /* continue */
410 case STATE_DATA_FINISH:
411 r = get_data_newline(source);
412 // log_debug("get_data_newline() -> %d", r);
416 source->state = STATE_EOF;
420 source->data_size = 0;
421 source->state = STATE_LINE;
423 return 0; /* continue */
425 assert_not_reached("wtf?");
429 int process_source(RemoteSource *source, bool compress, bool seal) {
430 size_t remain, target;
434 assert(source->writer);
436 r = process_data(source);
440 /* We have a full event */
441 log_trace("Received a full event from source@%p fd:%d (%s)",
442 source, source->fd, source->name);
444 if (!source->iovw.count) {
445 log_warning("Entry with no payload, skipping");
449 assert(source->iovw.iovec);
450 assert(source->iovw.count);
452 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
454 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
455 iovw_size(&source->iovw));
460 iovw_free_contents(&source->iovw);
462 /* possibly reset buffer position */
463 remain = source->filled - source->offset;
465 if (remain == 0) /* no brainer */
466 source->offset = source->scanned = source->filled = 0;
467 else if (source->offset > source->size - source->filled &&
468 source->offset > remain) {
469 memcpy(source->buf, source->buf + source->offset, remain);
470 source->offset = source->scanned = 0;
471 source->filled = remain;
474 target = source->size;
475 while (target > 16 * LINE_CHUNK && remain < target / 2)
477 if (target < source->size) {
480 tmp = realloc(source->buf, target);
482 log_warning("Failed to reallocate buffer to (smaller) size %zu",
485 log_debug("Reallocated buffer from %zu to %zu bytes",
486 source->size, target);
488 source->size = target;