free(source->buf);
iovw_free_contents(&source->iovw);
- log_debug("Writer ref count %u", source->writer->n_ref);
+ log_debug("Writer ref count %i", source->writer->n_ref);
writer_unref(source->writer);
sd_event_source_unref(source->event);
+ sd_event_source_unref(source->buffer_event);
free(source);
}
if (source->passive_fd)
/* we have to wait for some data to come to us */
- return -EWOULDBLOCK;
+ return -EAGAIN;
+ /* We know that source->filled is at most DATA_SIZE_MAX, so if
+ we reallocate it, we'll increase the size at least a bit. */
+ assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
if (source->size - source->filled < LINE_CHUNK &&
- !realloc_buffer(source,
- MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
+ !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
return log_oom();
+ assert(source->buf);
assert(source->size - source->filled >= LINE_CHUNK ||
source->size == ENTRY_SIZE_MAX);
- n = read(source->fd, source->buf + source->filled,
+ n = read(source->fd,
+ source->buf + source->filled,
source->size - source->filled);
if (n < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK)
- log_error("read(%d, ..., %zd): %m", source->fd,
- source->size - source->filled);
+ if (errno != EAGAIN)
+ log_error_errno(errno, "read(%d, ..., %zu): %m",
+ source->fd,
+ source->size - source->filled);
return -errno;
} else if (n == 0)
return 0;
if (source->passive_fd)
/* we have to wait for some data to come to us */
- return -EWOULDBLOCK;
+ return -EAGAIN;
if (!realloc_buffer(source, source->offset + size))
return log_oom();
n = read(source->fd, source->buf + source->filled,
source->size - source->filled);
if (n < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK)
- log_error("read(%d, ..., %zd): %m", source->fd,
- source->size - source->filled);
+ if (errno != EAGAIN)
+ log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
+ source->size - source->filled);
return -errno;
} else if (n == 0)
return 0;
return 0;
}
-int process_data(RemoteSource *source) {
+static int process_data(RemoteSource *source) {
int r;
switch(source->state) {
assert(line[n-1] == '\n');
if (n == 1) {
- log_debug("Received empty line, event is ready");
+ log_trace("Received empty line, event is ready");
return 1;
}
LLLLLLLL0011223344...\n
*/
sep = memchr(line, '=', n);
- if (sep)
+ if (sep) {
/* chomp newline */
n--;
- else
+
+ r = iovw_put(&source->iovw, line, n);
+ if (r < 0)
+ return r;
+ } else {
/* replace \n with = */
line[n-1] = '=';
- log_debug("Received: %.*s", (int) n, line);
- r = iovw_put(&source->iovw, line, n);
- if (r < 0) {
- log_error("Failed to put line in iovect");
- return r;
+ source->field_len = n;
+ source->state = STATE_DATA_START;
+
+ /* we cannot put the field in iovec until we have all data */
}
- if (!sep)
- source->state = STATE_DATA_START;
+ log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
+
return 0; /* continue */
}
assert(source->data_size == 0);
r = get_data_size(source);
- log_debug("get_data_size() -> %d", r);
+ // log_debug("get_data_size() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
case STATE_DATA: {
void *data;
+ char *field;
assert(source->data_size > 0);
r = get_data_data(source, &data);
- log_debug("get_data_data() -> %d", r);
+ // log_debug("get_data_data() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
assert(data);
- r = iovw_put(&source->iovw, data, source->data_size);
- if (r < 0) {
- log_error("failed to put binary buffer in iovect");
+ field = (char*) data - sizeof(uint64_t) - source->field_len;
+ memmove(field + sizeof(uint64_t), field, source->field_len);
+
+ r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
+ if (r < 0)
return r;
- }
source->state = STATE_DATA_FINISH;
case STATE_DATA_FINISH:
r = get_data_newline(source);
- log_debug("get_data_newline() -> %d", r);
+ // log_debug("get_data_newline() -> %d", r);
if (r < 0)
return r;
if (r == 0) {
return r;
/* We have a full event */
- log_debug("Received a full event from source@%p fd:%d (%s)",
+ log_trace("Received full event from source@%p fd:%d (%s)",
source, source->fd, source->name);
if (!source->iovw.count) {
r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
if (r < 0)
- log_error("Failed to write entry of %zu bytes: %s",
- iovw_size(&source->iovw), strerror(-r));
+ log_error_errno(r, "Failed to write entry of %zu bytes: %m",
+ iovw_size(&source->iovw));
else
r = 1;
char *tmp;
tmp = realloc(source->buf, target);
- if (tmp)
+ if (!tmp)
log_warning("Failed to reallocate buffer to (smaller) size %zu",
target);
else {