chiark / gitweb /
journal-remote: rework fd and writer reference handling
[elogind.git] / src / journal-remote / journal-remote-parse.c
index fe21bd3..cdc920e 100644 (file)
 #include "journal-remote-parse.h"
 #include "journald-native.h"
 
-#define LINE_CHUNK 1024u
+#define LINE_CHUNK 8*1024u
 
 void source_free(RemoteSource *source) {
         if (!source)
                 return;
 
-        if (source->fd >= 0) {
+        if (source->fd >= 0 && !source->passive_fd) {
                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
-                close(source->fd);
+                safe_close(source->fd);
         }
+
         free(source->name);
         free(source->buf);
         iovw_free_contents(&source->iovw);
+
+        log_debug("Writer ref count %u", source->writer->n_ref);
+        writer_unref(source->writer);
+
+        sd_event_source_unref(source->event);
+
         free(source);
 }
 
+/**
+ * Initialize zero-filled source with given values. On success, takes
+ * ownerhship of fd and writer, otherwise does not touch them.
+ */
+RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
+
+        RemoteSource *source;
+
+        log_debug("Creating source for %sfd:%d (%s)",
+                  passive_fd ? "passive " : "", fd, name);
+
+        assert(fd >= 0);
+
+        source = new0(RemoteSource, 1);
+        if (!source)
+                return NULL;
+
+        source->fd = fd;
+        source->passive_fd = passive_fd;
+        source->name = name;
+        source->writer = writer;
+
+        return source;
+}
+
 static int get_line(RemoteSource *source, char **line, size_t *size) {
         ssize_t n, remain;
         char *c = NULL;
@@ -48,6 +80,7 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
         assert(source->state == STATE_LINE);
         assert(source->filled <= source->size);
         assert(source->buf == NULL || source->size > 0);
+        assert(source->fd >= 0);
 
         while (true) {
                 if (source->buf)
@@ -62,19 +95,23 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
                         return -E2BIG;
                 }
 
-                if (source->fd < 0)
+                if (source->passive_fd)
                         /* we have to wait for some data to come to us */
                         return -EWOULDBLOCK;
 
                 if (source->size - source->filled < LINE_CHUNK &&
                     !GREEDY_REALLOC(source->buf, source->size,
-                                    MAX(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
+                                    MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
                                 return log_oom();
 
-                assert(source->size - source->filled >= LINE_CHUNK);
+                assert(source->size - source->filled >= LINE_CHUNK ||
+                       source->size == DATA_SIZE_MAX);
+
+                // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
+                // to accomodate such big fields.
 
                 n = read(source->fd, source->buf + source->filled,
-                         MAX(source->size, DATA_SIZE_MAX) - source->filled);
+                         source->size - source->filled);
                 if (n < 0) {
                         if (errno != EAGAIN && errno != EWOULDBLOCK)
                                 log_error("read(%d, ..., %zd): %m", source->fd,
@@ -139,10 +176,11 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
         assert(source->scanned <= source->filled);
         assert(source->buf != NULL || source->size == 0);
         assert(source->buf == NULL || source->size > 0);
+        assert(source->fd >= 0);
         assert(data);
 
         while(source->filled < size) {
-                if (source->fd < 0)
+                if (source->passive_fd)
                         /* we have to wait for some data to come to us */
                         return -EWOULDBLOCK;
 
@@ -196,7 +234,7 @@ static int get_data_size(RemoteSource *source) {
 
         source->data_size = le64toh( *(uint64_t *) data );
         if (source->data_size > DATA_SIZE_MAX) {
-                log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
+                log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
                           source->data_size, DATA_SIZE_MAX);
                 return -EINVAL;
         }
@@ -411,19 +449,19 @@ int process_data(RemoteSource *source) {
         }
 }
 
-int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
+int process_source(RemoteSource *source, bool compress, bool seal) {
         int r;
 
         assert(source);
-        assert(writer);
+        assert(source->writer);
 
         r = process_data(source);
         if (r <= 0)
                 return r;
 
         /* We have a full event */
-        log_info("Received a full event from source@%p fd:%d (%s)",
-                 source, source->fd, source->name);
+        log_debug("Received a full event from source@%p fd:%d (%s)",
+                  source, source->fd, source->name);
 
         if (!source->iovw.count) {
                 log_warning("Entry with no payload, skipping");
@@ -433,7 +471,7 @@ int process_source(RemoteSource *source, Writer *writer, bool compress, bool sea
         assert(source->iovw.iovec);
         assert(source->iovw.count);
 
-        r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
+        r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
         if (r < 0)
                 log_error("Failed to write entry of %zu bytes: %s",
                           iovw_size(&source->iovw), strerror(-r));