1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
37 iovw_free_contents(&source->iovw);
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
48 assert(source->state == STATE_LINE);
49 assert(source->filled <= source->size);
50 assert(source->buf == NULL || source->size > 0);
52 c = memchr(source->buf, '\n', source->filled);
57 if (source->size - source->filled < LINE_CHUNK) {
58 // XXX: add check for maximum line length
60 if (!GREEDY_REALLOC(source->buf, source->size,
61 source->filled + LINE_CHUNK))
64 assert(source->size - source->filled >= LINE_CHUNK);
66 n = read(source->fd, source->buf + source->filled,
67 source->size - source->filled);
69 if (errno != EAGAIN && errno != EWOULDBLOCK)
70 log_error("read(%d, ..., %zd): %m", source->fd,
71 source->size - source->filled);
76 c = memchr(source->buf + source->filled, '\n', n);
84 *size = c + 1 - source->buf;
86 /* Check if something remains */
87 remain = source->buf + source->filled - c - 1;
90 newsize = MAX(remain, LINE_CHUNK);
91 newbuf = malloc(newsize);
94 memcpy(newbuf, c + 1, remain);
97 source->size = newsize;
98 source->filled = remain;
103 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
106 size_t newsize = 0, remain;
109 assert(source->state == STATE_DATA_START ||
110 source->state == STATE_DATA ||
111 source->state == STATE_DATA_FINISH);
112 assert(size <= DATA_SIZE_MAX);
113 assert(source->filled <= source->size);
114 assert(source->buf != NULL || source->size == 0);
115 assert(source->buf == NULL || source->size > 0);
118 while(source->filled < size) {
119 if (!GREEDY_REALLOC(source->buf, source->size, size))
122 n = read(source->fd, source->buf + source->filled,
123 source->size - source->filled);
125 if (errno != EAGAIN && errno != EWOULDBLOCK)
126 log_error("read(%d, ..., %zd): %m", source->fd,
127 source->size - source->filled);
137 /* Check if something remains */
138 assert(size <= source->filled);
139 remain = source->filled - size;
141 newsize = MAX(remain, LINE_CHUNK);
142 newbuf = malloc(newsize);
145 memcpy(newbuf, source->buf + size, remain);
147 source->buf = newbuf;
148 source->size = newsize;
149 source->filled = remain;
154 static int get_data_size(RemoteSource *source) {
156 void _cleanup_free_ *data = NULL;
159 assert(source->state == STATE_DATA_START);
160 assert(source->data_size == 0);
162 r = fill_fixed_size(source, &data, sizeof(uint64_t));
166 source->data_size = le64toh( *(uint64_t *) data );
167 if (source->data_size > DATA_SIZE_MAX) {
168 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
169 source->data_size, DATA_SIZE_MAX);
172 if (source->data_size == 0)
173 log_warning("Binary field with zero length");
178 static int get_data_data(RemoteSource *source, void **data) {
183 assert(source->state == STATE_DATA);
185 r = fill_fixed_size(source, data, source->data_size);
192 static int get_data_newline(RemoteSource *source) {
194 char _cleanup_free_ *data = NULL;
197 assert(source->state == STATE_DATA_FINISH);
199 r = fill_fixed_size(source, (void**) &data, 1);
205 log_error("expected newline, got '%c'", *data);
212 static int process_dunder(RemoteSource *source, char *line, size_t n) {
213 const char *timestamp;
218 assert(line[n-1] == '\n');
220 /* XXX: is it worth to support timestamps in extended format?
221 * We don't produce them, but who knows... */
223 timestamp = startswith(line, "__CURSOR=");
225 /* ignore __CURSOR */
228 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
230 long long unsigned x;
232 r = safe_atollu(timestamp, &x);
234 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
236 source->ts.realtime = x;
237 return r < 0 ? r : 1;
240 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
242 long long unsigned x;
244 r = safe_atollu(timestamp, &x);
246 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
248 source->ts.monotonic = x;
249 return r < 0 ? r : 1;
252 timestamp = startswith(line, "__");
254 log_notice("Unknown dunder line %s", line);
262 int process_data(RemoteSource *source) {
265 switch(source->state) {
270 assert(source->data_size == 0);
272 r = get_line(source, &line, &n);
276 source->state = STATE_EOF;
280 assert(line[n-1] == '\n');
283 log_debug("Received empty line, event is ready");
288 r = process_dunder(source, line, n);
291 return r < 0 ? r : 0;
297 LLLLLLLL0011223344...\n
299 sep = memchr(line, '=', n);
304 /* replace \n with = */
306 log_debug("Received: %.*s", (int) n, line);
308 r = iovw_put(&source->iovw, line, n);
310 log_error("Failed to put line in iovect");
316 source->state = STATE_DATA_START;
317 return 0; /* continue */
320 case STATE_DATA_START:
321 assert(source->data_size == 0);
323 r = get_data_size(source);
324 log_debug("get_data_size() -> %d", r);
328 source->state = STATE_EOF;
332 source->state = source->data_size > 0 ?
333 STATE_DATA : STATE_DATA_FINISH;
335 return 0; /* continue */
340 assert(source->data_size > 0);
342 r = get_data_data(source, &data);
343 log_debug("get_data_data() -> %d", r);
347 source->state = STATE_EOF;
353 r = iovw_put(&source->iovw, data, source->data_size);
355 log_error("failed to put binary buffer in iovect");
359 source->state = STATE_DATA_FINISH;
361 return 0; /* continue */
364 case STATE_DATA_FINISH:
365 r = get_data_newline(source);
366 log_debug("get_data_newline() -> %d", r);
370 source->state = STATE_EOF;
374 source->data_size = 0;
375 source->state = STATE_LINE;
377 return 0; /* continue */
379 assert_not_reached("wtf?");
383 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
389 r = process_data(source);
393 /* We have a full event */
394 log_info("Received a full event from source@%p fd:%d (%s)",
395 source, source->fd, source->name);
397 if (!source->iovw.count) {
398 log_warning("Entry with no payload, skipping");
402 assert(source->iovw.iovec);
403 assert(source->iovw.count);
405 r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
407 log_error("Failed to write entry of %zu bytes: %s",
408 iovw_size(&source->iovw), strerror(-r));
413 iovw_free_contents(&source->iovw);