chiark / gitweb /
Remove src/journal
[elogind.git] / src / journal-remote / journal-remote-parse.c
index 47ff368e6c0ff7ddf9bfea83513a817dca3d7a44..5ff05d3ad6182661f95c21b3a297af9eff00d5cf 100644 (file)
@@ -37,10 +37,11 @@ void source_free(RemoteSource *source) {
         free(source->buf);
         iovw_free_contents(&source->iovw);
 
-        log_debug("Writer ref count %u", source->writer->n_ref);
+        log_debug("Writer ref count %i", source->writer->n_ref);
         writer_unref(source->writer);
 
         sd_event_source_unref(source->event);
+        sd_event_source_unref(source->buffer_event);
 
         free(source);
 }
@@ -111,22 +112,27 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
 
                 if (source->passive_fd)
                         /* we have to wait for some data to come to us */
-                        return -EWOULDBLOCK;
+                        return -EAGAIN;
 
+                /* We know that source->filled is at most DATA_SIZE_MAX, so if
+                   we reallocate it, we'll increase the size at least a bit. */
+                assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
                 if (source->size - source->filled < LINE_CHUNK &&
-                    !realloc_buffer(source,
-                                    MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
+                    !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
                                 return log_oom();
 
+                assert(source->buf);
                 assert(source->size - source->filled >= LINE_CHUNK ||
                        source->size == ENTRY_SIZE_MAX);
 
-                n = read(source->fd, source->buf + source->filled,
+                n = read(source->fd,
+                         source->buf + source->filled,
                          source->size - source->filled);
                 if (n < 0) {
-                        if (errno != EAGAIN && errno != EWOULDBLOCK)
-                                log_error("read(%d, ..., %zd): %m", source->fd,
-                                          source->size - source->filled);
+                        if (errno != EAGAIN)
+                                log_error_errno(errno, "read(%d, ..., %zu): %m",
+                                                source->fd,
+                                                source->size - source->filled);
                         return -errno;
                 } else if (n == 0)
                         return 0;
@@ -177,7 +183,7 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
 
                 if (source->passive_fd)
                         /* we have to wait for some data to come to us */
-                        return -EWOULDBLOCK;
+                        return -EAGAIN;
 
                 if (!realloc_buffer(source, source->offset + size))
                         return log_oom();
@@ -185,9 +191,9 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
                 n = read(source->fd, source->buf + source->filled,
                          source->size - source->filled);
                 if (n < 0) {
-                        if (errno != EAGAIN && errno != EWOULDBLOCK)
-                                log_error("read(%d, ..., %zd): %m", source->fd,
-                                          source->size - source->filled);
+                        if (errno != EAGAIN)
+                                log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
+                                                source->size - source->filled);
                         return -errno;
                 } else if (n == 0)
                         return 0;
@@ -309,13 +315,13 @@ static int process_dunder(RemoteSource *source, char *line, size_t n) {
         return 0;
 }
 
-int process_data(RemoteSource *source) {
+static int process_data(RemoteSource *source) {
         int r;
 
         switch(source->state) {
         case STATE_LINE: {
                 char *line, *sep;
-                size_t n;
+                size_t n = 0;
 
                 assert(source->data_size == 0);
 
@@ -344,22 +350,25 @@ int process_data(RemoteSource *source) {
                    LLLLLLLL0011223344...\n
                 */
                 sep = memchr(line, '=', n);
-                if (sep)
+                if (sep) {
                         /* chomp newline */
                         n--;
-                else
+
+                        r = iovw_put(&source->iovw, line, n);
+                        if (r < 0)
+                                return r;
+                } else {
                         /* replace \n with = */
                         line[n-1] = '=';
-                log_trace("Received: %.*s", (int) n, line);
 
-                r = iovw_put(&source->iovw, line, n);
-                if (r < 0) {
-                        log_error("Failed to put line in iovect");
-                        return r;
+                        source->field_len = n;
+                        source->state = STATE_DATA_START;
+
+                        /* we cannot put the field in iovec until we have all data */
                 }
 
-                if (!sep)
-                        source->state = STATE_DATA_START;
+                log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
+
                 return 0; /* continue */
         }
 
@@ -382,6 +391,7 @@ int process_data(RemoteSource *source) {
 
         case STATE_DATA: {
                 void *data;
+                char *field;
 
                 assert(source->data_size > 0);
 
@@ -396,11 +406,12 @@ int process_data(RemoteSource *source) {
 
                 assert(data);
 
-                r = iovw_put(&source->iovw, data, source->data_size);
-                if (r < 0) {
-                        log_error("failed to put binary buffer in iovect");
+                field = (char*) data - sizeof(uint64_t) - source->field_len;
+                memmove(field + sizeof(uint64_t), field, source->field_len);
+
+                r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
+                if (r < 0)
                         return r;
-                }
 
                 source->state = STATE_DATA_FINISH;
 
@@ -438,7 +449,7 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
                 return r;
 
         /* We have a full event */
-        log_trace("Received full event from source@%p fd:%d (%s)",
+        log_trace("Received full event from source@%p fd:%d (%s)",
                   source, source->fd, source->name);
 
         if (!source->iovw.count) {