1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
37 iovw_free_contents(&source->iovw);
39 sd_event_source_unref(source->event);
44 static int get_line(RemoteSource *source, char **line, size_t *size) {
51 assert(source->state == STATE_LINE);
52 assert(source->filled <= source->size);
53 assert(source->buf == NULL || source->size > 0);
57 c = memchr(source->buf + source->scanned, '\n',
58 source->filled - source->scanned);
62 source->scanned = source->filled;
63 if (source->scanned >= DATA_SIZE_MAX) {
64 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
69 /* we have to wait for some data to come to us */
72 if (source->size - source->filled < LINE_CHUNK &&
73 !GREEDY_REALLOC(source->buf, source->size,
74 MAX(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
77 assert(source->size - source->filled >= LINE_CHUNK);
79 n = read(source->fd, source->buf + source->filled,
80 MAX(source->size, DATA_SIZE_MAX) - source->filled);
82 if (errno != EAGAIN && errno != EWOULDBLOCK)
83 log_error("read(%d, ..., %zd): %m", source->fd,
84 source->size - source->filled);
93 *size = c + 1 - source->buf;
95 /* Check if something remains */
96 remain = source->buf + source->filled - c - 1;
99 newsize = MAX(remain, LINE_CHUNK);
100 newbuf = malloc(newsize);
103 memcpy(newbuf, c + 1, remain);
105 source->buf = newbuf;
106 source->size = newsize;
107 source->filled = remain;
113 int push_data(RemoteSource *source, const char *data, size_t size) {
115 assert(source->state != STATE_EOF);
117 if (!GREEDY_REALLOC(source->buf, source->size,
118 source->filled + size)) {
119 log_error("Failed to store received data of size %zu "
120 "(in addition to existing %zu bytes with %zu filled): %s",
121 size, source->size, source->filled, strerror(ENOMEM));
125 memcpy(source->buf + source->filled, data, size);
126 source->filled += size;
131 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
134 size_t newsize = 0, remain;
137 assert(source->state == STATE_DATA_START ||
138 source->state == STATE_DATA ||
139 source->state == STATE_DATA_FINISH);
140 assert(size <= DATA_SIZE_MAX);
141 assert(source->filled <= source->size);
142 assert(source->scanned <= source->filled);
143 assert(source->buf != NULL || source->size == 0);
144 assert(source->buf == NULL || source->size > 0);
147 while(source->filled < size) {
149 /* we have to wait for some data to come to us */
152 if (!GREEDY_REALLOC(source->buf, source->size, size))
155 n = read(source->fd, source->buf + source->filled,
156 source->size - source->filled);
158 if (errno != EAGAIN && errno != EWOULDBLOCK)
159 log_error("read(%d, ..., %zd): %m", source->fd,
160 source->size - source->filled);
170 /* Check if something remains */
171 assert(size <= source->filled);
172 remain = source->filled - size;
174 newsize = MAX(remain, LINE_CHUNK);
175 newbuf = malloc(newsize);
178 memcpy(newbuf, source->buf + size, remain);
180 source->buf = newbuf;
181 source->size = newsize;
182 source->filled = remain;
188 static int get_data_size(RemoteSource *source) {
190 _cleanup_free_ void *data = NULL;
193 assert(source->state == STATE_DATA_START);
194 assert(source->data_size == 0);
196 r = fill_fixed_size(source, &data, sizeof(uint64_t));
200 source->data_size = le64toh( *(uint64_t *) data );
201 if (source->data_size > DATA_SIZE_MAX) {
202 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
203 source->data_size, DATA_SIZE_MAX);
206 if (source->data_size == 0)
207 log_warning("Binary field with zero length");
212 static int get_data_data(RemoteSource *source, void **data) {
217 assert(source->state == STATE_DATA);
219 r = fill_fixed_size(source, data, source->data_size);
226 static int get_data_newline(RemoteSource *source) {
228 _cleanup_free_ char *data = NULL;
231 assert(source->state == STATE_DATA_FINISH);
233 r = fill_fixed_size(source, (void**) &data, 1);
239 log_error("expected newline, got '%c'", *data);
246 static int process_dunder(RemoteSource *source, char *line, size_t n) {
247 const char *timestamp;
252 assert(line[n-1] == '\n');
254 /* XXX: is it worth to support timestamps in extended format?
255 * We don't produce them, but who knows... */
257 timestamp = startswith(line, "__CURSOR=");
259 /* ignore __CURSOR */
262 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
264 long long unsigned x;
266 r = safe_atollu(timestamp, &x);
268 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
270 source->ts.realtime = x;
271 return r < 0 ? r : 1;
274 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
276 long long unsigned x;
278 r = safe_atollu(timestamp, &x);
280 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
282 source->ts.monotonic = x;
283 return r < 0 ? r : 1;
286 timestamp = startswith(line, "__");
288 log_notice("Unknown dunder line %s", line);
296 int process_data(RemoteSource *source) {
299 switch(source->state) {
304 assert(source->data_size == 0);
306 r = get_line(source, &line, &n);
310 source->state = STATE_EOF;
314 assert(line[n-1] == '\n');
317 log_debug("Received empty line, event is ready");
322 r = process_dunder(source, line, n);
325 return r < 0 ? r : 0;
331 LLLLLLLL0011223344...\n
333 sep = memchr(line, '=', n);
338 /* replace \n with = */
340 log_debug("Received: %.*s", (int) n, line);
342 r = iovw_put(&source->iovw, line, n);
344 log_error("Failed to put line in iovect");
350 source->state = STATE_DATA_START;
351 return 0; /* continue */
354 case STATE_DATA_START:
355 assert(source->data_size == 0);
357 r = get_data_size(source);
358 log_debug("get_data_size() -> %d", r);
362 source->state = STATE_EOF;
366 source->state = source->data_size > 0 ?
367 STATE_DATA : STATE_DATA_FINISH;
369 return 0; /* continue */
374 assert(source->data_size > 0);
376 r = get_data_data(source, &data);
377 log_debug("get_data_data() -> %d", r);
381 source->state = STATE_EOF;
387 r = iovw_put(&source->iovw, data, source->data_size);
389 log_error("failed to put binary buffer in iovect");
393 source->state = STATE_DATA_FINISH;
395 return 0; /* continue */
398 case STATE_DATA_FINISH:
399 r = get_data_newline(source);
400 log_debug("get_data_newline() -> %d", r);
404 source->state = STATE_EOF;
408 source->data_size = 0;
409 source->state = STATE_LINE;
411 return 0; /* continue */
413 assert_not_reached("wtf?");
417 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
423 r = process_data(source);
427 /* We have a full event */
428 log_info("Received a full event from source@%p fd:%d (%s)",
429 source, source->fd, source->name);
431 if (!source->iovw.count) {
432 log_warning("Entry with no payload, skipping");
436 assert(source->iovw.iovec);
437 assert(source->iovw.count);
439 r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
441 log_error("Failed to write entry of %zu bytes: %s",
442 iovw_size(&source->iovw), strerror(-r));
447 iovw_free_contents(&source->iovw);