1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 8*1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
37 iovw_free_contents(&source->iovw);
39 sd_event_source_unref(source->event);
44 static int get_line(RemoteSource *source, char **line, size_t *size) {
51 assert(source->state == STATE_LINE);
52 assert(source->filled <= source->size);
53 assert(source->buf == NULL || source->size > 0);
57 c = memchr(source->buf + source->scanned, '\n',
58 source->filled - source->scanned);
62 source->scanned = source->filled;
63 if (source->scanned >= DATA_SIZE_MAX) {
64 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
69 /* we have to wait for some data to come to us */
72 if (source->size - source->filled < LINE_CHUNK &&
73 !GREEDY_REALLOC(source->buf, source->size,
74 MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
77 assert(source->size - source->filled >= LINE_CHUNK ||
78 source->size == DATA_SIZE_MAX);
80 // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
81 // to accomodate such big fields.
83 n = read(source->fd, source->buf + source->filled,
84 source->size - source->filled);
86 if (errno != EAGAIN && errno != EWOULDBLOCK)
87 log_error("read(%d, ..., %zd): %m", source->fd,
88 source->size - source->filled);
97 *size = c + 1 - source->buf;
99 /* Check if something remains */
100 remain = source->buf + source->filled - c - 1;
103 newsize = MAX(remain, LINE_CHUNK);
104 newbuf = malloc(newsize);
107 memcpy(newbuf, c + 1, remain);
109 source->buf = newbuf;
110 source->size = newsize;
111 source->filled = remain;
117 int push_data(RemoteSource *source, const char *data, size_t size) {
119 assert(source->state != STATE_EOF);
121 if (!GREEDY_REALLOC(source->buf, source->size,
122 source->filled + size)) {
123 log_error("Failed to store received data of size %zu "
124 "(in addition to existing %zu bytes with %zu filled): %s",
125 size, source->size, source->filled, strerror(ENOMEM));
129 memcpy(source->buf + source->filled, data, size);
130 source->filled += size;
135 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
138 size_t newsize = 0, remain;
141 assert(source->state == STATE_DATA_START ||
142 source->state == STATE_DATA ||
143 source->state == STATE_DATA_FINISH);
144 assert(size <= DATA_SIZE_MAX);
145 assert(source->filled <= source->size);
146 assert(source->scanned <= source->filled);
147 assert(source->buf != NULL || source->size == 0);
148 assert(source->buf == NULL || source->size > 0);
151 while(source->filled < size) {
153 /* we have to wait for some data to come to us */
156 if (!GREEDY_REALLOC(source->buf, source->size, size))
159 n = read(source->fd, source->buf + source->filled,
160 source->size - source->filled);
162 if (errno != EAGAIN && errno != EWOULDBLOCK)
163 log_error("read(%d, ..., %zd): %m", source->fd,
164 source->size - source->filled);
174 /* Check if something remains */
175 assert(size <= source->filled);
176 remain = source->filled - size;
178 newsize = MAX(remain, LINE_CHUNK);
179 newbuf = malloc(newsize);
182 memcpy(newbuf, source->buf + size, remain);
184 source->buf = newbuf;
185 source->size = newsize;
186 source->filled = remain;
192 static int get_data_size(RemoteSource *source) {
194 _cleanup_free_ void *data = NULL;
197 assert(source->state == STATE_DATA_START);
198 assert(source->data_size == 0);
200 r = fill_fixed_size(source, &data, sizeof(uint64_t));
204 source->data_size = le64toh( *(uint64_t *) data );
205 if (source->data_size > DATA_SIZE_MAX) {
206 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
207 source->data_size, DATA_SIZE_MAX);
210 if (source->data_size == 0)
211 log_warning("Binary field with zero length");
216 static int get_data_data(RemoteSource *source, void **data) {
221 assert(source->state == STATE_DATA);
223 r = fill_fixed_size(source, data, source->data_size);
230 static int get_data_newline(RemoteSource *source) {
232 _cleanup_free_ char *data = NULL;
235 assert(source->state == STATE_DATA_FINISH);
237 r = fill_fixed_size(source, (void**) &data, 1);
243 log_error("expected newline, got '%c'", *data);
250 static int process_dunder(RemoteSource *source, char *line, size_t n) {
251 const char *timestamp;
256 assert(line[n-1] == '\n');
258 /* XXX: is it worth to support timestamps in extended format?
259 * We don't produce them, but who knows... */
261 timestamp = startswith(line, "__CURSOR=");
263 /* ignore __CURSOR */
266 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
268 long long unsigned x;
270 r = safe_atollu(timestamp, &x);
272 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
274 source->ts.realtime = x;
275 return r < 0 ? r : 1;
278 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
280 long long unsigned x;
282 r = safe_atollu(timestamp, &x);
284 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
286 source->ts.monotonic = x;
287 return r < 0 ? r : 1;
290 timestamp = startswith(line, "__");
292 log_notice("Unknown dunder line %s", line);
300 int process_data(RemoteSource *source) {
303 switch(source->state) {
308 assert(source->data_size == 0);
310 r = get_line(source, &line, &n);
314 source->state = STATE_EOF;
318 assert(line[n-1] == '\n');
321 log_debug("Received empty line, event is ready");
326 r = process_dunder(source, line, n);
329 return r < 0 ? r : 0;
335 LLLLLLLL0011223344...\n
337 sep = memchr(line, '=', n);
342 /* replace \n with = */
344 log_debug("Received: %.*s", (int) n, line);
346 r = iovw_put(&source->iovw, line, n);
348 log_error("Failed to put line in iovect");
354 source->state = STATE_DATA_START;
355 return 0; /* continue */
358 case STATE_DATA_START:
359 assert(source->data_size == 0);
361 r = get_data_size(source);
362 log_debug("get_data_size() -> %d", r);
366 source->state = STATE_EOF;
370 source->state = source->data_size > 0 ?
371 STATE_DATA : STATE_DATA_FINISH;
373 return 0; /* continue */
378 assert(source->data_size > 0);
380 r = get_data_data(source, &data);
381 log_debug("get_data_data() -> %d", r);
385 source->state = STATE_EOF;
391 r = iovw_put(&source->iovw, data, source->data_size);
393 log_error("failed to put binary buffer in iovect");
397 source->state = STATE_DATA_FINISH;
399 return 0; /* continue */
402 case STATE_DATA_FINISH:
403 r = get_data_newline(source);
404 log_debug("get_data_newline() -> %d", r);
408 source->state = STATE_EOF;
412 source->data_size = 0;
413 source->state = STATE_LINE;
415 return 0; /* continue */
417 assert_not_reached("wtf?");
421 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
427 r = process_data(source);
431 /* We have a full event */
432 log_debug("Received a full event from source@%p fd:%d (%s)",
433 source, source->fd, source->name);
435 if (!source->iovw.count) {
436 log_warning("Entry with no payload, skipping");
440 assert(source->iovw.iovec);
441 assert(source->iovw.count);
443 r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
445 log_error("Failed to write entry of %zu bytes: %s",
446 iovw_size(&source->iovw), strerror(-r));
451 iovw_free_contents(&source->iovw);