1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 8*1024u
27 void source_free(RemoteSource *source) {
31 if (source->fd >= 0 && !source->passive_fd) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
38 iovw_free_contents(&source->iovw);
40 log_debug("Writer ref count %u", source->writer->n_ref);
41 writer_unref(source->writer);
43 sd_event_source_unref(source->event);
49 * Initialize zero-filled source with given values. On success, takes
50 * ownerhship of fd and writer, otherwise does not touch them.
52 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
56 log_debug("Creating source for %sfd:%d (%s)",
57 passive_fd ? "passive " : "", fd, name);
61 source = new0(RemoteSource, 1);
66 source->passive_fd = passive_fd;
68 source->writer = writer;
73 static int get_line(RemoteSource *source, char **line, size_t *size) {
80 assert(source->state == STATE_LINE);
81 assert(source->filled <= source->size);
82 assert(source->buf == NULL || source->size > 0);
83 assert(source->fd >= 0);
87 c = memchr(source->buf + source->scanned, '\n',
88 source->filled - source->scanned);
92 source->scanned = source->filled;
93 if (source->scanned >= DATA_SIZE_MAX) {
94 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
98 if (source->passive_fd)
99 /* we have to wait for some data to come to us */
102 if (source->size - source->filled < LINE_CHUNK &&
103 !GREEDY_REALLOC(source->buf, source->size,
104 MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
107 assert(source->size - source->filled >= LINE_CHUNK ||
108 source->size == DATA_SIZE_MAX);
110 // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
111 // to accomodate such big fields.
113 n = read(source->fd, source->buf + source->filled,
114 source->size - source->filled);
116 if (errno != EAGAIN && errno != EWOULDBLOCK)
117 log_error("read(%d, ..., %zd): %m", source->fd,
118 source->size - source->filled);
127 *size = c + 1 - source->buf;
129 /* Check if something remains */
130 remain = source->buf + source->filled - c - 1;
133 newsize = MAX(remain, LINE_CHUNK);
134 newbuf = malloc(newsize);
137 memcpy(newbuf, c + 1, remain);
139 source->buf = newbuf;
140 source->size = newsize;
141 source->filled = remain;
147 int push_data(RemoteSource *source, const char *data, size_t size) {
149 assert(source->state != STATE_EOF);
151 if (!GREEDY_REALLOC(source->buf, source->size,
152 source->filled + size)) {
153 log_error("Failed to store received data of size %zu "
154 "(in addition to existing %zu bytes with %zu filled): %s",
155 size, source->size, source->filled, strerror(ENOMEM));
159 memcpy(source->buf + source->filled, data, size);
160 source->filled += size;
165 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
168 size_t newsize = 0, remain;
171 assert(source->state == STATE_DATA_START ||
172 source->state == STATE_DATA ||
173 source->state == STATE_DATA_FINISH);
174 assert(size <= DATA_SIZE_MAX);
175 assert(source->filled <= source->size);
176 assert(source->scanned <= source->filled);
177 assert(source->buf != NULL || source->size == 0);
178 assert(source->buf == NULL || source->size > 0);
179 assert(source->fd >= 0);
182 while(source->filled < size) {
183 if (source->passive_fd)
184 /* we have to wait for some data to come to us */
187 if (!GREEDY_REALLOC(source->buf, source->size, size))
190 n = read(source->fd, source->buf + source->filled,
191 source->size - source->filled);
193 if (errno != EAGAIN && errno != EWOULDBLOCK)
194 log_error("read(%d, ..., %zd): %m", source->fd,
195 source->size - source->filled);
205 /* Check if something remains */
206 assert(size <= source->filled);
207 remain = source->filled - size;
209 newsize = MAX(remain, LINE_CHUNK);
210 newbuf = malloc(newsize);
213 memcpy(newbuf, source->buf + size, remain);
215 source->buf = newbuf;
216 source->size = newsize;
217 source->filled = remain;
223 static int get_data_size(RemoteSource *source) {
225 _cleanup_free_ void *data = NULL;
228 assert(source->state == STATE_DATA_START);
229 assert(source->data_size == 0);
231 r = fill_fixed_size(source, &data, sizeof(uint64_t));
235 source->data_size = le64toh( *(uint64_t *) data );
236 if (source->data_size > DATA_SIZE_MAX) {
237 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
238 source->data_size, DATA_SIZE_MAX);
241 if (source->data_size == 0)
242 log_warning("Binary field with zero length");
247 static int get_data_data(RemoteSource *source, void **data) {
252 assert(source->state == STATE_DATA);
254 r = fill_fixed_size(source, data, source->data_size);
261 static int get_data_newline(RemoteSource *source) {
263 _cleanup_free_ char *data = NULL;
266 assert(source->state == STATE_DATA_FINISH);
268 r = fill_fixed_size(source, (void**) &data, 1);
274 log_error("expected newline, got '%c'", *data);
281 static int process_dunder(RemoteSource *source, char *line, size_t n) {
282 const char *timestamp;
287 assert(line[n-1] == '\n');
289 /* XXX: is it worth to support timestamps in extended format?
290 * We don't produce them, but who knows... */
292 timestamp = startswith(line, "__CURSOR=");
294 /* ignore __CURSOR */
297 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
299 long long unsigned x;
301 r = safe_atollu(timestamp, &x);
303 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
305 source->ts.realtime = x;
306 return r < 0 ? r : 1;
309 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
311 long long unsigned x;
313 r = safe_atollu(timestamp, &x);
315 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
317 source->ts.monotonic = x;
318 return r < 0 ? r : 1;
321 timestamp = startswith(line, "__");
323 log_notice("Unknown dunder line %s", line);
331 int process_data(RemoteSource *source) {
334 switch(source->state) {
339 assert(source->data_size == 0);
341 r = get_line(source, &line, &n);
345 source->state = STATE_EOF;
349 assert(line[n-1] == '\n');
352 log_debug("Received empty line, event is ready");
357 r = process_dunder(source, line, n);
360 return r < 0 ? r : 0;
366 LLLLLLLL0011223344...\n
368 sep = memchr(line, '=', n);
373 /* replace \n with = */
375 log_debug("Received: %.*s", (int) n, line);
377 r = iovw_put(&source->iovw, line, n);
379 log_error("Failed to put line in iovect");
385 source->state = STATE_DATA_START;
386 return 0; /* continue */
389 case STATE_DATA_START:
390 assert(source->data_size == 0);
392 r = get_data_size(source);
393 log_debug("get_data_size() -> %d", r);
397 source->state = STATE_EOF;
401 source->state = source->data_size > 0 ?
402 STATE_DATA : STATE_DATA_FINISH;
404 return 0; /* continue */
409 assert(source->data_size > 0);
411 r = get_data_data(source, &data);
412 log_debug("get_data_data() -> %d", r);
416 source->state = STATE_EOF;
422 r = iovw_put(&source->iovw, data, source->data_size);
424 log_error("failed to put binary buffer in iovect");
428 source->state = STATE_DATA_FINISH;
430 return 0; /* continue */
433 case STATE_DATA_FINISH:
434 r = get_data_newline(source);
435 log_debug("get_data_newline() -> %d", r);
439 source->state = STATE_EOF;
443 source->data_size = 0;
444 source->state = STATE_LINE;
446 return 0; /* continue */
448 assert_not_reached("wtf?");
452 int process_source(RemoteSource *source, bool compress, bool seal) {
456 assert(source->writer);
458 r = process_data(source);
462 /* We have a full event */
463 log_debug("Received a full event from source@%p fd:%d (%s)",
464 source, source->fd, source->name);
466 if (!source->iovw.count) {
467 log_warning("Entry with no payload, skipping");
471 assert(source->iovw.iovec);
472 assert(source->iovw.count);
474 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
476 log_error("Failed to write entry of %zu bytes: %s",
477 iovw_size(&source->iovw), strerror(-r));
482 iovw_free_contents(&source->iovw);