1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
37 iovw_free_contents(&source->iovw);
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
48 assert(source->state == STATE_LINE);
49 assert(source->filled <= source->size);
50 assert(source->buf == NULL || source->size > 0);
53 c = memchr(source->buf, '\n', source->filled);
60 /* we have to wait for some data to come to us */
63 if (source->size - source->filled < LINE_CHUNK) {
64 // XXX: add check for maximum line length
66 if (!GREEDY_REALLOC(source->buf, source->size,
67 source->filled + LINE_CHUNK))
70 assert(source->size - source->filled >= LINE_CHUNK);
72 n = read(source->fd, source->buf + source->filled,
73 source->size - source->filled);
75 if (errno != EAGAIN && errno != EWOULDBLOCK)
76 log_error("read(%d, ..., %zd): %m", source->fd,
77 source->size - source->filled);
82 c = memchr(source->buf + source->filled, '\n', n);
90 *size = c + 1 - source->buf;
92 /* Check if something remains */
93 remain = source->buf + source->filled - c - 1;
96 newsize = MAX(remain, LINE_CHUNK);
97 newbuf = malloc(newsize);
100 memcpy(newbuf, c + 1, remain);
102 source->buf = newbuf;
103 source->size = newsize;
104 source->filled = remain;
109 int push_data(RemoteSource *source, const char *data, size_t size) {
111 assert(source->state != STATE_EOF);
113 if (!GREEDY_REALLOC(source->buf, source->size,
114 source->filled + size))
117 memcpy(source->buf + source->filled, data, size);
118 source->filled += size;
123 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
126 size_t newsize = 0, remain;
129 assert(source->state == STATE_DATA_START ||
130 source->state == STATE_DATA ||
131 source->state == STATE_DATA_FINISH);
132 assert(size <= DATA_SIZE_MAX);
133 assert(source->filled <= source->size);
134 assert(source->buf != NULL || source->size == 0);
135 assert(source->buf == NULL || source->size > 0);
138 while(source->filled < size) {
140 /* we have to wait for some data to come to us */
143 if (!GREEDY_REALLOC(source->buf, source->size, size))
146 n = read(source->fd, source->buf + source->filled,
147 source->size - source->filled);
149 if (errno != EAGAIN && errno != EWOULDBLOCK)
150 log_error("read(%d, ..., %zd): %m", source->fd,
151 source->size - source->filled);
161 /* Check if something remains */
162 assert(size <= source->filled);
163 remain = source->filled - size;
165 newsize = MAX(remain, LINE_CHUNK);
166 newbuf = malloc(newsize);
169 memcpy(newbuf, source->buf + size, remain);
171 source->buf = newbuf;
172 source->size = newsize;
173 source->filled = remain;
178 static int get_data_size(RemoteSource *source) {
180 void _cleanup_free_ *data = NULL;
183 assert(source->state == STATE_DATA_START);
184 assert(source->data_size == 0);
186 r = fill_fixed_size(source, &data, sizeof(uint64_t));
190 source->data_size = le64toh( *(uint64_t *) data );
191 if (source->data_size > DATA_SIZE_MAX) {
192 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
193 source->data_size, DATA_SIZE_MAX);
196 if (source->data_size == 0)
197 log_warning("Binary field with zero length");
202 static int get_data_data(RemoteSource *source, void **data) {
207 assert(source->state == STATE_DATA);
209 r = fill_fixed_size(source, data, source->data_size);
216 static int get_data_newline(RemoteSource *source) {
218 char _cleanup_free_ *data = NULL;
221 assert(source->state == STATE_DATA_FINISH);
223 r = fill_fixed_size(source, (void**) &data, 1);
229 log_error("expected newline, got '%c'", *data);
236 static int process_dunder(RemoteSource *source, char *line, size_t n) {
237 const char *timestamp;
242 assert(line[n-1] == '\n');
244 /* XXX: is it worth to support timestamps in extended format?
245 * We don't produce them, but who knows... */
247 timestamp = startswith(line, "__CURSOR=");
249 /* ignore __CURSOR */
252 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
254 long long unsigned x;
256 r = safe_atollu(timestamp, &x);
258 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
260 source->ts.realtime = x;
261 return r < 0 ? r : 1;
264 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
266 long long unsigned x;
268 r = safe_atollu(timestamp, &x);
270 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
272 source->ts.monotonic = x;
273 return r < 0 ? r : 1;
276 timestamp = startswith(line, "__");
278 log_notice("Unknown dunder line %s", line);
286 int process_data(RemoteSource *source) {
289 switch(source->state) {
294 assert(source->data_size == 0);
296 r = get_line(source, &line, &n);
300 source->state = STATE_EOF;
304 assert(line[n-1] == '\n');
307 log_debug("Received empty line, event is ready");
312 r = process_dunder(source, line, n);
315 return r < 0 ? r : 0;
321 LLLLLLLL0011223344...\n
323 sep = memchr(line, '=', n);
328 /* replace \n with = */
330 log_debug("Received: %.*s", (int) n, line);
332 r = iovw_put(&source->iovw, line, n);
334 log_error("Failed to put line in iovect");
340 source->state = STATE_DATA_START;
341 return 0; /* continue */
344 case STATE_DATA_START:
345 assert(source->data_size == 0);
347 r = get_data_size(source);
348 log_debug("get_data_size() -> %d", r);
352 source->state = STATE_EOF;
356 source->state = source->data_size > 0 ?
357 STATE_DATA : STATE_DATA_FINISH;
359 return 0; /* continue */
364 assert(source->data_size > 0);
366 r = get_data_data(source, &data);
367 log_debug("get_data_data() -> %d", r);
371 source->state = STATE_EOF;
377 r = iovw_put(&source->iovw, data, source->data_size);
379 log_error("failed to put binary buffer in iovect");
383 source->state = STATE_DATA_FINISH;
385 return 0; /* continue */
388 case STATE_DATA_FINISH:
389 r = get_data_newline(source);
390 log_debug("get_data_newline() -> %d", r);
394 source->state = STATE_EOF;
398 source->data_size = 0;
399 source->state = STATE_LINE;
401 return 0; /* continue */
403 assert_not_reached("wtf?");
407 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
413 r = process_data(source);
417 /* We have a full event */
418 log_info("Received a full event from source@%p fd:%d (%s)",
419 source, source->fd, source->name);
421 if (!source->iovw.count) {
422 log_warning("Entry with no payload, skipping");
426 assert(source->iovw.iovec);
427 assert(source->iovw.count);
429 r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
431 log_error("Failed to write entry of %zu bytes: %s",
432 iovw_size(&source->iovw), strerror(-r));
437 iovw_free_contents(&source->iovw);