chiark / gitweb /
Revert "socket: add support for TCP fast Open"
[elogind.git] / src / journal-remote / journal-remote-write.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2012 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote.h"
23
24 int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
25         if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
26                 return log_oom();
27
28         iovw->iovec[iovw->count++] = (struct iovec) {data, len};
29         return 0;
30 }
31
32 void iovw_free_contents(struct iovec_wrapper *iovw) {
33         free(iovw->iovec);
34         iovw->iovec = NULL;
35         iovw->size_bytes = iovw->count = 0;
36 }
37
38 size_t iovw_size(struct iovec_wrapper *iovw) {
39         size_t n = 0, i;
40
41         for (i = 0; i < iovw->count; i++)
42                 n += iovw->iovec[i].iov_len;
43
44         return n;
45 }
46
47 void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
48         size_t i;
49
50         for (i = 0; i < iovw->count; i++)
51                 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
52 }
53
54 /**********************************************************************
55  **********************************************************************
56  **********************************************************************/
57
58 static int do_rotate(JournalFile **f, bool compress, bool seal) {
59         int r = journal_file_rotate(f, compress, seal);
60         if (r < 0) {
61                 if (*f)
62                         log_error("Failed to rotate %s: %s", (*f)->path,
63                                   strerror(-r));
64                 else
65                         log_error("Failed to create rotated journal: %s",
66                                   strerror(-r));
67         }
68
69         return r;
70 }
71
72 Writer* writer_new(RemoteServer *server) {
73         Writer *w;
74
75         w = new0(Writer, 1);
76         if (!w)
77                 return NULL;
78
79         memset(&w->metrics, 0xFF, sizeof(w->metrics));
80
81         w->mmap = mmap_cache_new();
82         if (!w->mmap) {
83                 free(w);
84                 return NULL;
85         }
86
87         w->n_ref = 1;
88         w->server = server;
89
90         return w;
91 }
92
93 Writer* writer_free(Writer *w) {
94         if (!w)
95                 return NULL;
96
97         if (w->journal) {
98                 log_debug("Closing journal file %s.", w->journal->path);
99                 journal_file_close(w->journal);
100         }
101
102         if (w->server) {
103                 w->server->event_count += w->seqnum;
104                 if (w->hashmap_key)
105                         hashmap_remove(w->server->writers, w->hashmap_key);
106         }
107
108         free(w->hashmap_key);
109
110         if (w->mmap)
111                 mmap_cache_unref(w->mmap);
112
113         free(w);
114
115         return NULL;
116 }
117
118 Writer* writer_unref(Writer *w) {
119         if (w && (-- w->n_ref <= 0))
120                 writer_free(w);
121
122         return NULL;
123 }
124
125 Writer* writer_ref(Writer *w) {
126         if (w)
127                 assert_se(++ w->n_ref >= 2);
128
129         return w;
130 }
131
132
133 int writer_write(Writer *s,
134                  struct iovec_wrapper *iovw,
135                  dual_timestamp *ts,
136                  bool compress,
137                  bool seal) {
138         int r;
139
140         assert(s);
141         assert(iovw);
142         assert(iovw->count > 0);
143
144         if (journal_file_rotate_suggested(s->journal, 0)) {
145                 log_info("%s: Journal header limits reached or header out-of-date, rotating",
146                          s->journal->path);
147                 r = do_rotate(&s->journal, compress, seal);
148                 if (r < 0)
149                         return r;
150         }
151
152         r = journal_file_append_entry(s->journal, ts, iovw->iovec, iovw->count,
153                                       &s->seqnum, NULL, NULL);
154         if (r >= 0)
155                 return 1;
156
157         log_debug("%s: Write failed, rotating: %s", s->journal->path, strerror(-r));
158         r = do_rotate(&s->journal, compress, seal);
159         if (r < 0)
160                 return r;
161         else
162                 log_info("%s: Successfully rotated journal", s->journal->path);
163
164         log_debug("Retrying write.");
165         r = journal_file_append_entry(s->journal, ts, iovw->iovec, iovw->count,
166                                       &s->seqnum, NULL, NULL);
167         return r < 0 ? r : 1;
168 }