chiark / gitweb /
journal-remote: process events without delay
[elogind.git] / src / journal-remote / journal-remote-parse.c
index 47ff368e6c0ff7ddf9bfea83513a817dca3d7a44..7e6295435113d6035d48725e97ef93d528cd322e 100644 (file)
@@ -37,10 +37,11 @@ void source_free(RemoteSource *source) {
         free(source->buf);
         iovw_free_contents(&source->iovw);
 
-        log_debug("Writer ref count %u", source->writer->n_ref);
+        log_debug("Writer ref count %i", source->writer->n_ref);
         writer_unref(source->writer);
 
         sd_event_source_unref(source->event);
+        sd_event_source_unref(source->buffer_event);
 
         free(source);
 }
@@ -125,8 +126,8 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
                          source->size - source->filled);
                 if (n < 0) {
                         if (errno != EAGAIN && errno != EWOULDBLOCK)
-                                log_error("read(%d, ..., %zd): %m", source->fd,
-                                          source->size - source->filled);
+                                log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
+                                                source->size - source->filled);
                         return -errno;
                 } else if (n == 0)
                         return 0;
@@ -186,8 +187,8 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
                          source->size - source->filled);
                 if (n < 0) {
                         if (errno != EAGAIN && errno != EWOULDBLOCK)
-                                log_error("read(%d, ..., %zd): %m", source->fd,
-                                          source->size - source->filled);
+                                log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
+                                                source->size - source->filled);
                         return -errno;
                 } else if (n == 0)
                         return 0;
@@ -344,22 +345,25 @@ int process_data(RemoteSource *source) {
                    LLLLLLLL0011223344...\n
                 */
                 sep = memchr(line, '=', n);
-                if (sep)
+                if (sep) {
                         /* chomp newline */
                         n--;
-                else
+
+                        r = iovw_put(&source->iovw, line, n);
+                        if (r < 0)
+                                return r;
+                } else {
                         /* replace \n with = */
                         line[n-1] = '=';
-                log_trace("Received: %.*s", (int) n, line);
 
-                r = iovw_put(&source->iovw, line, n);
-                if (r < 0) {
-                        log_error("Failed to put line in iovect");
-                        return r;
+                        source->field_len = n;
+                        source->state = STATE_DATA_START;
+
+                        /* we cannot put the field in iovec until we have all data */
                 }
 
-                if (!sep)
-                        source->state = STATE_DATA_START;
+                log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
+
                 return 0; /* continue */
         }
 
@@ -382,6 +386,7 @@ int process_data(RemoteSource *source) {
 
         case STATE_DATA: {
                 void *data;
+                char *field;
 
                 assert(source->data_size > 0);
 
@@ -396,11 +401,12 @@ int process_data(RemoteSource *source) {
 
                 assert(data);
 
-                r = iovw_put(&source->iovw, data, source->data_size);
-                if (r < 0) {
-                        log_error("failed to put binary buffer in iovect");
+                field = (char*) data - sizeof(uint64_t) - source->field_len;
+                memmove(field + sizeof(uint64_t), field, source->field_len);
+
+                r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
+                if (r < 0)
                         return r;
-                }
 
                 source->state = STATE_DATA_FINISH;
 
@@ -438,7 +444,7 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
                 return r;
 
         /* We have a full event */
-        log_trace("Received full event from source@%p fd:%d (%s)",
+        log_trace("Received full event from source@%p fd:%d (%s)",
                   source, source->fd, source->name);
 
         if (!source->iovw.count) {