#include "journal-remote-parse.h"
#include "journald-native.h"
-#define LINE_CHUNK 1024u
+#define LINE_CHUNK 8*1024u
void source_free(RemoteSource *source) {
if (!source)
return;
- if (source->fd >= 0) {
+ if (source->fd >= 0 && !source->passive_fd) {
log_debug("Closing fd:%d (%s)", source->fd, source->name);
- close(source->fd);
+ safe_close(source->fd);
}
+
free(source->name);
free(source->buf);
iovw_free_contents(&source->iovw);
+
+ log_debug("Writer ref count %u", source->writer->n_ref);
+ writer_unref(source->writer);
+
+ sd_event_source_unref(source->event);
+
free(source);
}
+/**
+ * Initialize zero-filled source with given values. On success, takes
+ * ownerhship of fd and writer, otherwise does not touch them.
+ */
+RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
+
+ RemoteSource *source;
+
+ log_debug("Creating source for %sfd:%d (%s)",
+ passive_fd ? "passive " : "", fd, name);
+
+ assert(fd >= 0);
+
+ source = new0(RemoteSource, 1);
+ if (!source)
+ return NULL;
+
+ source->fd = fd;
+ source->passive_fd = passive_fd;
+ source->name = name;
+ source->writer = writer;
+
+ return source;
+}
+
static int get_line(RemoteSource *source, char **line, size_t *size) {
ssize_t n, remain;
char *c = NULL;
assert(source->state == STATE_LINE);
assert(source->filled <= source->size);
assert(source->buf == NULL || source->size > 0);
+ assert(source->fd >= 0);
+
+ while (true) {
+ if (source->buf)
+ c = memchr(source->buf + source->scanned, '\n',
+ source->filled - source->scanned);
+ if (c != NULL)
+ break;
+
+ source->scanned = source->filled;
+ if (source->scanned >= DATA_SIZE_MAX) {
+ log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
+ return -E2BIG;
+ }
- if (source->buf)
- c = memchr(source->buf, '\n', source->filled);
-
- if (c != NULL)
- goto docopy;
-
- resize:
- if (source->fd < 0)
- /* we have to wait for some data to come to us */
- return -EWOULDBLOCK;
+ if (source->passive_fd)
+ /* we have to wait for some data to come to us */
+ return -EWOULDBLOCK;
- if (source->size - source->filled < LINE_CHUNK) {
- // XXX: add check for maximum line length
+ if (source->size - source->filled < LINE_CHUNK &&
+ !GREEDY_REALLOC(source->buf, source->size,
+ MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
+ return log_oom();
- if (!GREEDY_REALLOC(source->buf, source->size,
- source->filled + LINE_CHUNK))
- return log_oom();
- }
- assert(source->size - source->filled >= LINE_CHUNK);
+ assert(source->size - source->filled >= LINE_CHUNK ||
+ source->size == DATA_SIZE_MAX);
- n = read(source->fd, source->buf + source->filled,
- source->size - source->filled);
- if (n < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK)
- log_error("read(%d, ..., %zd): %m", source->fd,
- source->size - source->filled);
- return -errno;
- } else if (n == 0)
- return 0;
+ // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
+ // to accomodate such big fields.
- c = memchr(source->buf + source->filled, '\n', n);
- source->filled += n;
+ n = read(source->fd, source->buf + source->filled,
+ source->size - source->filled);
+ if (n < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ log_error("read(%d, ..., %zd): %m", source->fd,
+ source->size - source->filled);
+ return -errno;
+ } else if (n == 0)
+ return 0;
- if (c == NULL)
- goto resize;
+ source->filled += n;
+ }
- docopy:
*line = source->buf;
*size = c + 1 - source->buf;
source->buf = newbuf;
source->size = newsize;
source->filled = remain;
+ source->scanned = 0;
return 1;
}
assert(source->state != STATE_EOF);
if (!GREEDY_REALLOC(source->buf, source->size,
- source->filled + size))
- return log_oom();
+ source->filled + size)) {
+ log_error("Failed to store received data of size %zu "
+ "(in addition to existing %zu bytes with %zu filled): %s",
+ size, source->size, source->filled, strerror(ENOMEM));
+ return -ENOMEM;
+ }
memcpy(source->buf + source->filled, data, size);
source->filled += size;
source->state == STATE_DATA_FINISH);
assert(size <= DATA_SIZE_MAX);
assert(source->filled <= source->size);
+ assert(source->scanned <= source->filled);
assert(source->buf != NULL || source->size == 0);
assert(source->buf == NULL || source->size > 0);
+ assert(source->fd >= 0);
assert(data);
while(source->filled < size) {
- if (source->fd < 0)
+ if (source->passive_fd)
/* we have to wait for some data to come to us */
return -EWOULDBLOCK;
source->buf = newbuf;
source->size = newsize;
source->filled = remain;
+ source->scanned = 0;
return 1;
}
source->data_size = le64toh( *(uint64_t *) data );
if (source->data_size > DATA_SIZE_MAX) {
- log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
+ log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
source->data_size, DATA_SIZE_MAX);
return -EINVAL;
}
}
}
-int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
+int process_source(RemoteSource *source, bool compress, bool seal) {
int r;
assert(source);
- assert(writer);
+ assert(source->writer);
r = process_data(source);
if (r <= 0)
return r;
/* We have a full event */
- log_info("Received a full event from source@%p fd:%d (%s)",
- source, source->fd, source->name);
+ log_debug("Received a full event from source@%p fd:%d (%s)",
+ source, source->fd, source->name);
if (!source->iovw.count) {
log_warning("Entry with no payload, skipping");
assert(source->iovw.iovec);
assert(source->iovw.count);
- r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
+ r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
if (r < 0)
log_error("Failed to write entry of %zu bytes: %s",
iovw_size(&source->iovw), strerror(-r));