1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 8*1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0 && !source->passive_fd) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
38 iovw_free_contents(&source->iovw);
40 log_debug("Writer ref count %i", source->writer->n_ref);
41 writer_unref(source->writer);
43 sd_event_source_unref(source->event);
49 * Initialize zero-filled source with given values. On success, takes
50 * ownerhship of fd and writer, otherwise does not touch them.
52 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
56 log_debug("Creating source for %sfd:%d (%s)",
57 passive_fd ? "passive " : "", fd, name);
61 source = new0(RemoteSource, 1);
66 source->passive_fd = passive_fd;
68 source->writer = writer;
73 static char* realloc_buffer(RemoteSource *source, size_t size) {
74 char *b, *old = source->buf;
76 b = GREEDY_REALLOC(source->buf, source->size, size);
80 iovw_rebase(&source->iovw, old, source->buf);
85 static int get_line(RemoteSource *source, char **line, size_t *size) {
90 assert(source->state == STATE_LINE);
91 assert(source->offset <= source->filled);
92 assert(source->filled <= source->size);
93 assert(source->buf == NULL || source->size > 0);
94 assert(source->fd >= 0);
98 size_t start = MAX(source->scanned, source->offset);
100 c = memchr(source->buf + start, '\n',
101 source->filled - start);
106 source->scanned = source->filled;
107 if (source->scanned >= DATA_SIZE_MAX) {
108 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
112 if (source->passive_fd)
113 /* we have to wait for some data to come to us */
116 if (source->size - source->filled < LINE_CHUNK &&
117 !realloc_buffer(source,
118 MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
121 assert(source->size - source->filled >= LINE_CHUNK ||
122 source->size == ENTRY_SIZE_MAX);
124 n = read(source->fd, source->buf + source->filled,
125 source->size - source->filled);
127 if (errno != EAGAIN && errno != EWOULDBLOCK)
128 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
129 source->size - source->filled);
137 *line = source->buf + source->offset;
138 *size = c + 1 - source->buf - source->offset;
139 source->offset += *size;
144 int push_data(RemoteSource *source, const char *data, size_t size) {
146 assert(source->state != STATE_EOF);
148 if (!realloc_buffer(source, source->filled + size)) {
149 log_error("Failed to store received data of size %zu "
150 "(in addition to existing %zu bytes with %zu filled): %s",
151 size, source->size, source->filled, strerror(ENOMEM));
155 memcpy(source->buf + source->filled, data, size);
156 source->filled += size;
161 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
164 assert(source->state == STATE_DATA_START ||
165 source->state == STATE_DATA ||
166 source->state == STATE_DATA_FINISH);
167 assert(size <= DATA_SIZE_MAX);
168 assert(source->offset <= source->filled);
169 assert(source->filled <= source->size);
170 assert(source->buf != NULL || source->size == 0);
171 assert(source->buf == NULL || source->size > 0);
172 assert(source->fd >= 0);
175 while (source->filled - source->offset < size) {
178 if (source->passive_fd)
179 /* we have to wait for some data to come to us */
182 if (!realloc_buffer(source, source->offset + size))
185 n = read(source->fd, source->buf + source->filled,
186 source->size - source->filled);
188 if (errno != EAGAIN && errno != EWOULDBLOCK)
189 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
190 source->size - source->filled);
198 *data = source->buf + source->offset;
199 source->offset += size;
204 static int get_data_size(RemoteSource *source) {
209 assert(source->state == STATE_DATA_START);
210 assert(source->data_size == 0);
212 r = fill_fixed_size(source, &data, sizeof(uint64_t));
216 source->data_size = le64toh( *(uint64_t *) data );
217 if (source->data_size > DATA_SIZE_MAX) {
218 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
219 source->data_size, DATA_SIZE_MAX);
222 if (source->data_size == 0)
223 log_warning("Binary field with zero length");
228 static int get_data_data(RemoteSource *source, void **data) {
233 assert(source->state == STATE_DATA);
235 r = fill_fixed_size(source, data, source->data_size);
242 static int get_data_newline(RemoteSource *source) {
247 assert(source->state == STATE_DATA_FINISH);
249 r = fill_fixed_size(source, (void**) &data, 1);
255 log_error("expected newline, got '%c'", *data);
262 static int process_dunder(RemoteSource *source, char *line, size_t n) {
263 const char *timestamp;
268 assert(line[n-1] == '\n');
270 /* XXX: is it worth to support timestamps in extended format?
271 * We don't produce them, but who knows... */
273 timestamp = startswith(line, "__CURSOR=");
275 /* ignore __CURSOR */
278 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
280 long long unsigned x;
282 r = safe_atollu(timestamp, &x);
284 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
286 source->ts.realtime = x;
287 return r < 0 ? r : 1;
290 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
292 long long unsigned x;
294 r = safe_atollu(timestamp, &x);
296 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
298 source->ts.monotonic = x;
299 return r < 0 ? r : 1;
302 timestamp = startswith(line, "__");
304 log_notice("Unknown dunder line %s", line);
312 int process_data(RemoteSource *source) {
315 switch(source->state) {
320 assert(source->data_size == 0);
322 r = get_line(source, &line, &n);
326 source->state = STATE_EOF;
330 assert(line[n-1] == '\n');
333 log_trace("Received empty line, event is ready");
337 r = process_dunder(source, line, n);
339 return r < 0 ? r : 0;
344 LLLLLLLL0011223344...\n
346 sep = memchr(line, '=', n);
351 r = iovw_put(&source->iovw, line, n);
355 /* replace \n with = */
358 source->field_len = n;
359 source->state = STATE_DATA_START;
361 /* we cannot put the field in iovec until we have all data */
364 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
366 return 0; /* continue */
369 case STATE_DATA_START:
370 assert(source->data_size == 0);
372 r = get_data_size(source);
373 // log_debug("get_data_size() -> %d", r);
377 source->state = STATE_EOF;
381 source->state = source->data_size > 0 ?
382 STATE_DATA : STATE_DATA_FINISH;
384 return 0; /* continue */
390 assert(source->data_size > 0);
392 r = get_data_data(source, &data);
393 // log_debug("get_data_data() -> %d", r);
397 source->state = STATE_EOF;
403 field = (char*) data - sizeof(uint64_t) - source->field_len;
404 memmove(field + sizeof(uint64_t), field, source->field_len);
406 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
410 source->state = STATE_DATA_FINISH;
412 return 0; /* continue */
415 case STATE_DATA_FINISH:
416 r = get_data_newline(source);
417 // log_debug("get_data_newline() -> %d", r);
421 source->state = STATE_EOF;
425 source->data_size = 0;
426 source->state = STATE_LINE;
428 return 0; /* continue */
430 assert_not_reached("wtf?");
434 int process_source(RemoteSource *source, bool compress, bool seal) {
435 size_t remain, target;
439 assert(source->writer);
441 r = process_data(source);
445 /* We have a full event */
446 log_trace("Received a full event from source@%p fd:%d (%s)",
447 source, source->fd, source->name);
449 if (!source->iovw.count) {
450 log_warning("Entry with no payload, skipping");
454 assert(source->iovw.iovec);
455 assert(source->iovw.count);
457 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
459 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
460 iovw_size(&source->iovw));
465 iovw_free_contents(&source->iovw);
467 /* possibly reset buffer position */
468 remain = source->filled - source->offset;
470 if (remain == 0) /* no brainer */
471 source->offset = source->scanned = source->filled = 0;
472 else if (source->offset > source->size - source->filled &&
473 source->offset > remain) {
474 memcpy(source->buf, source->buf + source->offset, remain);
475 source->offset = source->scanned = 0;
476 source->filled = remain;
479 target = source->size;
480 while (target > 16 * LINE_CHUNK && remain < target / 2)
482 if (target < source->size) {
485 tmp = realloc(source->buf, target);
487 log_warning("Failed to reallocate buffer to (smaller) size %zu",
490 log_debug("Reallocated buffer from %zu to %zu bytes",
491 source->size, target);
493 source->size = target;