1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
37 iovw_free_contents(&source->iovw);
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
48 assert(source->state == STATE_LINE);
49 assert(source->filled <= source->size);
50 assert(source->buf == NULL || source->size > 0);
54 c = memchr(source->buf + source->scanned, '\n',
55 source->filled - source->scanned);
59 source->scanned = source->filled;
60 if (source->scanned >= DATA_SIZE_MAX) {
61 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
66 /* we have to wait for some data to come to us */
69 if (source->size - source->filled < LINE_CHUNK &&
70 !GREEDY_REALLOC(source->buf, source->size,
71 MAX(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
74 assert(source->size - source->filled >= LINE_CHUNK);
76 n = read(source->fd, source->buf + source->filled,
77 MAX(source->size, DATA_SIZE_MAX) - source->filled);
79 if (errno != EAGAIN && errno != EWOULDBLOCK)
80 log_error("read(%d, ..., %zd): %m", source->fd,
81 source->size - source->filled);
90 *size = c + 1 - source->buf;
92 /* Check if something remains */
93 remain = source->buf + source->filled - c - 1;
96 newsize = MAX(remain, LINE_CHUNK);
97 newbuf = malloc(newsize);
100 memcpy(newbuf, c + 1, remain);
102 source->buf = newbuf;
103 source->size = newsize;
104 source->filled = remain;
110 int push_data(RemoteSource *source, const char *data, size_t size) {
112 assert(source->state != STATE_EOF);
114 if (!GREEDY_REALLOC(source->buf, source->size,
115 source->filled + size)) {
116 log_error("Failed to store received data of size %zu "
117 "(in addition to existing %zu bytes with %zu filled): %s",
118 size, source->size, source->filled, strerror(ENOMEM));
122 memcpy(source->buf + source->filled, data, size);
123 source->filled += size;
128 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
131 size_t newsize = 0, remain;
134 assert(source->state == STATE_DATA_START ||
135 source->state == STATE_DATA ||
136 source->state == STATE_DATA_FINISH);
137 assert(size <= DATA_SIZE_MAX);
138 assert(source->filled <= source->size);
139 assert(source->scanned <= source->filled);
140 assert(source->buf != NULL || source->size == 0);
141 assert(source->buf == NULL || source->size > 0);
144 while(source->filled < size) {
146 /* we have to wait for some data to come to us */
149 if (!GREEDY_REALLOC(source->buf, source->size, size))
152 n = read(source->fd, source->buf + source->filled,
153 source->size - source->filled);
155 if (errno != EAGAIN && errno != EWOULDBLOCK)
156 log_error("read(%d, ..., %zd): %m", source->fd,
157 source->size - source->filled);
167 /* Check if something remains */
168 assert(size <= source->filled);
169 remain = source->filled - size;
171 newsize = MAX(remain, LINE_CHUNK);
172 newbuf = malloc(newsize);
175 memcpy(newbuf, source->buf + size, remain);
177 source->buf = newbuf;
178 source->size = newsize;
179 source->filled = remain;
185 static int get_data_size(RemoteSource *source) {
187 _cleanup_free_ void *data = NULL;
190 assert(source->state == STATE_DATA_START);
191 assert(source->data_size == 0);
193 r = fill_fixed_size(source, &data, sizeof(uint64_t));
197 source->data_size = le64toh( *(uint64_t *) data );
198 if (source->data_size > DATA_SIZE_MAX) {
199 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
200 source->data_size, DATA_SIZE_MAX);
203 if (source->data_size == 0)
204 log_warning("Binary field with zero length");
209 static int get_data_data(RemoteSource *source, void **data) {
214 assert(source->state == STATE_DATA);
216 r = fill_fixed_size(source, data, source->data_size);
223 static int get_data_newline(RemoteSource *source) {
225 _cleanup_free_ char *data = NULL;
228 assert(source->state == STATE_DATA_FINISH);
230 r = fill_fixed_size(source, (void**) &data, 1);
236 log_error("expected newline, got '%c'", *data);
243 static int process_dunder(RemoteSource *source, char *line, size_t n) {
244 const char *timestamp;
249 assert(line[n-1] == '\n');
251 /* XXX: is it worth to support timestamps in extended format?
252 * We don't produce them, but who knows... */
254 timestamp = startswith(line, "__CURSOR=");
256 /* ignore __CURSOR */
259 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
261 long long unsigned x;
263 r = safe_atollu(timestamp, &x);
265 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
267 source->ts.realtime = x;
268 return r < 0 ? r : 1;
271 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
273 long long unsigned x;
275 r = safe_atollu(timestamp, &x);
277 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
279 source->ts.monotonic = x;
280 return r < 0 ? r : 1;
283 timestamp = startswith(line, "__");
285 log_notice("Unknown dunder line %s", line);
293 int process_data(RemoteSource *source) {
296 switch(source->state) {
301 assert(source->data_size == 0);
303 r = get_line(source, &line, &n);
307 source->state = STATE_EOF;
311 assert(line[n-1] == '\n');
314 log_debug("Received empty line, event is ready");
319 r = process_dunder(source, line, n);
322 return r < 0 ? r : 0;
328 LLLLLLLL0011223344...\n
330 sep = memchr(line, '=', n);
335 /* replace \n with = */
337 log_debug("Received: %.*s", (int) n, line);
339 r = iovw_put(&source->iovw, line, n);
341 log_error("Failed to put line in iovect");
347 source->state = STATE_DATA_START;
348 return 0; /* continue */
351 case STATE_DATA_START:
352 assert(source->data_size == 0);
354 r = get_data_size(source);
355 log_debug("get_data_size() -> %d", r);
359 source->state = STATE_EOF;
363 source->state = source->data_size > 0 ?
364 STATE_DATA : STATE_DATA_FINISH;
366 return 0; /* continue */
371 assert(source->data_size > 0);
373 r = get_data_data(source, &data);
374 log_debug("get_data_data() -> %d", r);
378 source->state = STATE_EOF;
384 r = iovw_put(&source->iovw, data, source->data_size);
386 log_error("failed to put binary buffer in iovect");
390 source->state = STATE_DATA_FINISH;
392 return 0; /* continue */
395 case STATE_DATA_FINISH:
396 r = get_data_newline(source);
397 log_debug("get_data_newline() -> %d", r);
401 source->state = STATE_EOF;
405 source->data_size = 0;
406 source->state = STATE_LINE;
408 return 0; /* continue */
410 assert_not_reached("wtf?");
414 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
420 r = process_data(source);
424 /* We have a full event */
425 log_info("Received a full event from source@%p fd:%d (%s)",
426 source, source->fd, source->name);
428 if (!source->iovw.count) {
429 log_warning("Entry with no payload, skipping");
433 assert(source->iovw.iovec);
434 assert(source->iovw.count);
436 r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
438 log_error("Failed to write entry of %zu bytes: %s",
439 iovw_size(&source->iovw), strerror(-r));
444 iovw_free_contents(&source->iovw);