chiark / gitweb /
journal-remote: rework fd and writer reference handling
[elogind.git] / src / journal-remote / journal-remote-write.c
index 449636cd8c3180b389776151e69b179bcad4db5a..cdd06f9effc25a9238eb289e1b9121010304fbb4 100644 (file)
@@ -19,6 +19,7 @@
   along with systemd; If not, see <http://www.gnu.org/licenses/>.
 ***/
 
   along with systemd; If not, see <http://www.gnu.org/licenses/>.
 ***/
 
+#include "journal-remote.h"
 #include "journal-remote-write.h"
 
 int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
 #include "journal-remote-write.h"
 
 int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
@@ -64,32 +65,67 @@ static int do_rotate(JournalFile **f, bool compress, bool seal) {
         return r;
 }
 
         return r;
 }
 
-int writer_init(Writer *s) {
-        assert(s);
+Writer* writer_new(RemoteServer *server) {
+        Writer *w;
 
 
-        s->journal = NULL;
+        w = new0(Writer, 1);
+        if (!w)
+                return NULL;
 
 
-        memset(&s->metrics, 0xFF, sizeof(s->metrics));
+        memset(&w->metrics, 0xFF, sizeof(w->metrics));
 
 
-        s->mmap = mmap_cache_new();
-        if (!s->mmap)
-                return log_oom();
+        w->mmap = mmap_cache_new();
+        if (!w->mmap) {
+                free(w);
+                return NULL;
+        }
 
 
-        s->seqnum = 0;
+        w->n_ref = 1;
+        w->server = server;
 
 
-        return 0;
+        return w;
 }
 
 }
 
-int writer_close(Writer *s) {
-        if (s->journal) {
-                journal_file_close(s->journal);
-                log_debug("Journal has been closed.");
+Writer* writer_free(Writer *w) {
+        if (!w)
+                return NULL;
+
+        if (w->journal) {
+                log_debug("Closing journal file %s.", w->journal->path);
+                journal_file_close(w->journal);
         }
         }
-        if (s->mmap)
-                mmap_cache_unref(s->mmap);
-        return 0;
+
+        if (w->server) {
+                w->server->event_count += w->seqnum;
+                if (w->hashmap_key)
+                        hashmap_remove(w->server->writers, w->hashmap_key);
+        }
+
+        free(w->hashmap_key);
+
+        if (w->mmap)
+                mmap_cache_unref(w->mmap);
+
+        free(w);
+
+        return NULL;
+}
+
+Writer* writer_unref(Writer *w) {
+        if (w && (-- w->n_ref <= 0))
+                writer_free(w);
+
+        return NULL;
 }
 
 }
 
+Writer* writer_ref(Writer *w) {
+        if (w)
+                assert_se(++ w->n_ref >= 2);
+
+        return w;
+}
+
+
 int writer_write(Writer *s,
                  struct iovec_wrapper *iovw,
                  dual_timestamp *ts,
 int writer_write(Writer *s,
                  struct iovec_wrapper *iovw,
                  dual_timestamp *ts,
@@ -114,10 +150,12 @@ int writer_write(Writer *s,
         if (r >= 0)
                 return 1;
 
         if (r >= 0)
                 return 1;
 
-        log_info("%s: Write failed, rotating", s->journal->path);
+        log_debug("%s: Write failed, rotating: %s", s->journal->path, strerror(-r));
         r = do_rotate(&s->journal, compress, seal);
         if (r < 0)
                 return r;
         r = do_rotate(&s->journal, compress, seal);
         if (r < 0)
                 return r;
+        else
+                log_info("%s: Successfully rotated journal", s->journal->path);
 
         log_debug("Retrying write.");
         r = journal_file_append_entry(s->journal, ts, iovw->iovec, iovw->count,
 
         log_debug("Retrying write.");
         r = journal_file_append_entry(s->journal, ts, iovw->iovec, iovw->count,