chiark / gitweb /
journal-remote: avoid copying input data
[elogind.git] / src / journal-remote / journal-remote-write.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2012 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote.h"
23 #include "journal-remote-write.h"
24
25 int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
26         if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
27                 return log_oom();
28
29         iovw->iovec[iovw->count++] = (struct iovec) {data, len};
30         return 0;
31 }
32
33 void iovw_free_contents(struct iovec_wrapper *iovw) {
34         free(iovw->iovec);
35         iovw->iovec = NULL;
36         iovw->size_bytes = iovw->count = 0;
37 }
38
39 size_t iovw_size(struct iovec_wrapper *iovw) {
40         size_t n = 0, i;
41
42         for (i = 0; i < iovw->count; i++)
43                 n += iovw->iovec[i].iov_len;
44
45         return n;
46 }
47
48 void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
49         size_t i;
50
51         for (i = 0; i < iovw->count; i++)
52                 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
53 }
54
55 /**********************************************************************
56  **********************************************************************
57  **********************************************************************/
58
59 static int do_rotate(JournalFile **f, bool compress, bool seal) {
60         int r = journal_file_rotate(f, compress, seal);
61         if (r < 0) {
62                 if (*f)
63                         log_error("Failed to rotate %s: %s", (*f)->path,
64                                   strerror(-r));
65                 else
66                         log_error("Failed to create rotated journal: %s",
67                                   strerror(-r));
68         }
69
70         return r;
71 }
72
73 Writer* writer_new(RemoteServer *server) {
74         Writer *w;
75
76         w = new0(Writer, 1);
77         if (!w)
78                 return NULL;
79
80         memset(&w->metrics, 0xFF, sizeof(w->metrics));
81
82         w->mmap = mmap_cache_new();
83         if (!w->mmap) {
84                 free(w);
85                 return NULL;
86         }
87
88         w->n_ref = 1;
89         w->server = server;
90
91         return w;
92 }
93
94 Writer* writer_free(Writer *w) {
95         if (!w)
96                 return NULL;
97
98         if (w->journal) {
99                 log_debug("Closing journal file %s.", w->journal->path);
100                 journal_file_close(w->journal);
101         }
102
103         if (w->server) {
104                 w->server->event_count += w->seqnum;
105                 if (w->hashmap_key)
106                         hashmap_remove(w->server->writers, w->hashmap_key);
107         }
108
109         free(w->hashmap_key);
110
111         if (w->mmap)
112                 mmap_cache_unref(w->mmap);
113
114         free(w);
115
116         return NULL;
117 }
118
119 Writer* writer_unref(Writer *w) {
120         if (w && (-- w->n_ref <= 0))
121                 writer_free(w);
122
123         return NULL;
124 }
125
126 Writer* writer_ref(Writer *w) {
127         if (w)
128                 assert_se(++ w->n_ref >= 2);
129
130         return w;
131 }
132
133
134 int writer_write(Writer *s,
135                  struct iovec_wrapper *iovw,
136                  dual_timestamp *ts,
137                  bool compress,
138                  bool seal) {
139         int r;
140
141         assert(s);
142         assert(iovw);
143         assert(iovw->count > 0);
144
145         if (journal_file_rotate_suggested(s->journal, 0)) {
146                 log_info("%s: Journal header limits reached or header out-of-date, rotating",
147                          s->journal->path);
148                 r = do_rotate(&s->journal, compress, seal);
149                 if (r < 0)
150                         return r;
151         }
152
153         r = journal_file_append_entry(s->journal, ts, iovw->iovec, iovw->count,
154                                       &s->seqnum, NULL, NULL);
155         if (r >= 0)
156                 return 1;
157
158         log_debug("%s: Write failed, rotating: %s", s->journal->path, strerror(-r));
159         r = do_rotate(&s->journal, compress, seal);
160         if (r < 0)
161                 return r;
162         else
163                 log_info("%s: Successfully rotated journal", s->journal->path);
164
165         log_debug("Retrying write.");
166         r = journal_file_append_entry(s->journal, ts, iovw->iovec, iovw->count,
167                                       &s->seqnum, NULL, NULL);
168         return r < 0 ? r : 1;
169 }