1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
37 iovw_free_contents(&source->iovw);
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
48 assert(source->state == STATE_LINE);
49 assert(source->filled <= source->size);
50 assert(source->buf == NULL || source->size > 0);
52 c = memchr(source->buf, '\n', source->filled);
58 /* we have to wait for some data to come to us */
61 if (source->size - source->filled < LINE_CHUNK) {
62 // XXX: add check for maximum line length
64 if (!GREEDY_REALLOC(source->buf, source->size,
65 source->filled + LINE_CHUNK))
68 assert(source->size - source->filled >= LINE_CHUNK);
70 n = read(source->fd, source->buf + source->filled,
71 source->size - source->filled);
73 if (errno != EAGAIN && errno != EWOULDBLOCK)
74 log_error("read(%d, ..., %zd): %m", source->fd,
75 source->size - source->filled);
80 c = memchr(source->buf + source->filled, '\n', n);
88 *size = c + 1 - source->buf;
90 /* Check if something remains */
91 remain = source->buf + source->filled - c - 1;
94 newsize = MAX(remain, LINE_CHUNK);
95 newbuf = malloc(newsize);
98 memcpy(newbuf, c + 1, remain);
100 source->buf = newbuf;
101 source->size = newsize;
102 source->filled = remain;
107 int push_data(RemoteSource *source, const char *data, size_t size) {
109 assert(source->state != STATE_EOF);
111 if (!GREEDY_REALLOC(source->buf, source->size,
112 source->filled + size))
115 memcpy(source->buf + source->filled, data, size);
116 source->filled += size;
121 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
124 size_t newsize = 0, remain;
127 assert(source->state == STATE_DATA_START ||
128 source->state == STATE_DATA ||
129 source->state == STATE_DATA_FINISH);
130 assert(size <= DATA_SIZE_MAX);
131 assert(source->filled <= source->size);
132 assert(source->buf != NULL || source->size == 0);
133 assert(source->buf == NULL || source->size > 0);
136 while(source->filled < size) {
138 /* we have to wait for some data to come to us */
141 if (!GREEDY_REALLOC(source->buf, source->size, size))
144 n = read(source->fd, source->buf + source->filled,
145 source->size - source->filled);
147 if (errno != EAGAIN && errno != EWOULDBLOCK)
148 log_error("read(%d, ..., %zd): %m", source->fd,
149 source->size - source->filled);
159 /* Check if something remains */
160 assert(size <= source->filled);
161 remain = source->filled - size;
163 newsize = MAX(remain, LINE_CHUNK);
164 newbuf = malloc(newsize);
167 memcpy(newbuf, source->buf + size, remain);
169 source->buf = newbuf;
170 source->size = newsize;
171 source->filled = remain;
176 static int get_data_size(RemoteSource *source) {
178 void _cleanup_free_ *data = NULL;
181 assert(source->state == STATE_DATA_START);
182 assert(source->data_size == 0);
184 r = fill_fixed_size(source, &data, sizeof(uint64_t));
188 source->data_size = le64toh( *(uint64_t *) data );
189 if (source->data_size > DATA_SIZE_MAX) {
190 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
191 source->data_size, DATA_SIZE_MAX);
194 if (source->data_size == 0)
195 log_warning("Binary field with zero length");
200 static int get_data_data(RemoteSource *source, void **data) {
205 assert(source->state == STATE_DATA);
207 r = fill_fixed_size(source, data, source->data_size);
214 static int get_data_newline(RemoteSource *source) {
216 char _cleanup_free_ *data = NULL;
219 assert(source->state == STATE_DATA_FINISH);
221 r = fill_fixed_size(source, (void**) &data, 1);
227 log_error("expected newline, got '%c'", *data);
234 static int process_dunder(RemoteSource *source, char *line, size_t n) {
235 const char *timestamp;
240 assert(line[n-1] == '\n');
242 /* XXX: is it worth to support timestamps in extended format?
243 * We don't produce them, but who knows... */
245 timestamp = startswith(line, "__CURSOR=");
247 /* ignore __CURSOR */
250 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
252 long long unsigned x;
254 r = safe_atollu(timestamp, &x);
256 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
258 source->ts.realtime = x;
259 return r < 0 ? r : 1;
262 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
264 long long unsigned x;
266 r = safe_atollu(timestamp, &x);
268 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
270 source->ts.monotonic = x;
271 return r < 0 ? r : 1;
274 timestamp = startswith(line, "__");
276 log_notice("Unknown dunder line %s", line);
284 int process_data(RemoteSource *source) {
287 switch(source->state) {
292 assert(source->data_size == 0);
294 r = get_line(source, &line, &n);
298 source->state = STATE_EOF;
302 assert(line[n-1] == '\n');
305 log_debug("Received empty line, event is ready");
310 r = process_dunder(source, line, n);
313 return r < 0 ? r : 0;
319 LLLLLLLL0011223344...\n
321 sep = memchr(line, '=', n);
326 /* replace \n with = */
328 log_debug("Received: %.*s", (int) n, line);
330 r = iovw_put(&source->iovw, line, n);
332 log_error("Failed to put line in iovect");
338 source->state = STATE_DATA_START;
339 return 0; /* continue */
342 case STATE_DATA_START:
343 assert(source->data_size == 0);
345 r = get_data_size(source);
346 log_debug("get_data_size() -> %d", r);
350 source->state = STATE_EOF;
354 source->state = source->data_size > 0 ?
355 STATE_DATA : STATE_DATA_FINISH;
357 return 0; /* continue */
362 assert(source->data_size > 0);
364 r = get_data_data(source, &data);
365 log_debug("get_data_data() -> %d", r);
369 source->state = STATE_EOF;
375 r = iovw_put(&source->iovw, data, source->data_size);
377 log_error("failed to put binary buffer in iovect");
381 source->state = STATE_DATA_FINISH;
383 return 0; /* continue */
386 case STATE_DATA_FINISH:
387 r = get_data_newline(source);
388 log_debug("get_data_newline() -> %d", r);
392 source->state = STATE_EOF;
396 source->data_size = 0;
397 source->state = STATE_LINE;
399 return 0; /* continue */
401 assert_not_reached("wtf?");
405 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
411 r = process_data(source);
415 /* We have a full event */
416 log_info("Received a full event from source@%p fd:%d (%s)",
417 source, source->fd, source->name);
419 if (!source->iovw.count) {
420 log_warning("Entry with no payload, skipping");
424 assert(source->iovw.iovec);
425 assert(source->iovw.count);
427 r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
429 log_error("Failed to write entry of %zu bytes: %s",
430 iovw_size(&source->iovw), strerror(-r));
435 iovw_free_contents(&source->iovw);