chiark / gitweb /
journal-remote: process events without delay
[elogind.git] / src / journal-remote / journal-remote-parse.c
index dfb87d49af7f88a17b1de4751f751c088f09afac..7e6295435113d6035d48725e97ef93d528cd322e 100644 (file)
@@ -37,10 +37,11 @@ void source_free(RemoteSource *source) {
         free(source->buf);
         iovw_free_contents(&source->iovw);
 
-        log_debug("Writer ref count %u", source->writer->n_ref);
+        log_debug("Writer ref count %i", source->writer->n_ref);
         writer_unref(source->writer);
 
         sd_event_source_unref(source->event);
+        sd_event_source_unref(source->buffer_event);
 
         free(source);
 }
@@ -125,8 +126,8 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
                          source->size - source->filled);
                 if (n < 0) {
                         if (errno != EAGAIN && errno != EWOULDBLOCK)
-                                log_error("read(%d, ..., %zd): %m", source->fd,
-                                          source->size - source->filled);
+                                log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
+                                                source->size - source->filled);
                         return -errno;
                 } else if (n == 0)
                         return 0;
@@ -186,8 +187,8 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
                          source->size - source->filled);
                 if (n < 0) {
                         if (errno != EAGAIN && errno != EWOULDBLOCK)
-                                log_error("read(%d, ..., %zd): %m", source->fd,
-                                          source->size - source->filled);
+                                log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
+                                                source->size - source->filled);
                         return -errno;
                 } else if (n == 0)
                         return 0;
@@ -330,7 +331,7 @@ int process_data(RemoteSource *source) {
                 assert(line[n-1] == '\n');
 
                 if (n == 1) {
-                        log_debug("Received empty line, event is ready");
+                        log_trace("Received empty line, event is ready");
                         return 1;
                 }
 
@@ -344,22 +345,25 @@ int process_data(RemoteSource *source) {
                    LLLLLLLL0011223344...\n
                 */
                 sep = memchr(line, '=', n);
-                if (sep)
+                if (sep) {
                         /* chomp newline */
                         n--;
-                else
+
+                        r = iovw_put(&source->iovw, line, n);
+                        if (r < 0)
+                                return r;
+                } else {
                         /* replace \n with = */
                         line[n-1] = '=';
-                log_debug("Received: %.*s", (int) n, line);
 
-                r = iovw_put(&source->iovw, line, n);
-                if (r < 0) {
-                        log_error("Failed to put line in iovect");
-                        return r;
+                        source->field_len = n;
+                        source->state = STATE_DATA_START;
+
+                        /* we cannot put the field in iovec until we have all data */
                 }
 
-                if (!sep)
-                        source->state = STATE_DATA_START;
+                log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
+
                 return 0; /* continue */
         }
 
@@ -367,7 +371,7 @@ int process_data(RemoteSource *source) {
                 assert(source->data_size == 0);
 
                 r = get_data_size(source);
-                log_debug("get_data_size() -> %d", r);
+                // log_debug("get_data_size() -> %d", r);
                 if (r < 0)
                         return r;
                 if (r == 0) {
@@ -382,11 +386,12 @@ int process_data(RemoteSource *source) {
 
         case STATE_DATA: {
                 void *data;
+                char *field;
 
                 assert(source->data_size > 0);
 
                 r = get_data_data(source, &data);
-                log_debug("get_data_data() -> %d", r);
+                // log_debug("get_data_data() -> %d", r);
                 if (r < 0)
                         return r;
                 if (r == 0) {
@@ -396,11 +401,12 @@ int process_data(RemoteSource *source) {
 
                 assert(data);
 
-                r = iovw_put(&source->iovw, data, source->data_size);
-                if (r < 0) {
-                        log_error("failed to put binary buffer in iovect");
+                field = (char*) data - sizeof(uint64_t) - source->field_len;
+                memmove(field + sizeof(uint64_t), field, source->field_len);
+
+                r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
+                if (r < 0)
                         return r;
-                }
 
                 source->state = STATE_DATA_FINISH;
 
@@ -409,7 +415,7 @@ int process_data(RemoteSource *source) {
 
         case STATE_DATA_FINISH:
                 r = get_data_newline(source);
-                log_debug("get_data_newline() -> %d", r);
+                // log_debug("get_data_newline() -> %d", r);
                 if (r < 0)
                         return r;
                 if (r == 0) {
@@ -438,7 +444,7 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
                 return r;
 
         /* We have a full event */
-        log_debug("Received a full event from source@%p fd:%d (%s)",
+        log_trace("Received full event from source@%p fd:%d (%s)",
                   source, source->fd, source->name);
 
         if (!source->iovw.count) {
@@ -451,8 +457,8 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
 
         r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
         if (r < 0)
-                log_error("Failed to write entry of %zu bytes: %s",
-                          iovw_size(&source->iovw), strerror(-r));
+                log_error_errno(r, "Failed to write entry of %zu bytes: %m",
+                                iovw_size(&source->iovw));
         else
                 r = 1;
 
@@ -478,7 +484,7 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
                 char *tmp;
 
                 tmp = realloc(source->buf, target);
-                if (tmp)
+                if (!tmp)
                         log_warning("Failed to reallocate buffer to (smaller) size %zu",
                                     target);
                 else {