chiark / gitweb /
Fix problem with allocating large buffers and log leftovers
[elogind.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 safe_close(source->fd);
34         }
35         free(source->name);
36         free(source->buf);
37         iovw_free_contents(&source->iovw);
38
39         sd_event_source_unref(source->event);
40
41         free(source);
42 }
43
44 static int get_line(RemoteSource *source, char **line, size_t *size) {
45         ssize_t n, remain;
46         char *c = NULL;
47         char *newbuf = NULL;
48         size_t newsize = 0;
49
50         assert(source);
51         assert(source->state == STATE_LINE);
52         assert(source->filled <= source->size);
53         assert(source->buf == NULL || source->size > 0);
54
55         while (true) {
56                 if (source->buf)
57                         c = memchr(source->buf + source->scanned, '\n',
58                                    source->filled - source->scanned);
59                 if (c != NULL)
60                         break;
61
62                 source->scanned = source->filled;
63                 if (source->scanned >= DATA_SIZE_MAX) {
64                         log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
65                         return -E2BIG;
66                 }
67
68                 if (source->fd < 0)
69                         /* we have to wait for some data to come to us */
70                         return -EWOULDBLOCK;
71
72                 if (source->size - source->filled < LINE_CHUNK &&
73                     !GREEDY_REALLOC(source->buf, source->size,
74                                     MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
75                                 return log_oom();
76
77                 assert(source->size - source->filled >= LINE_CHUNK ||
78                        source->size == DATA_SIZE_MAX);
79
80                 // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
81                 // to accomodate such big fields.
82
83                 n = read(source->fd, source->buf + source->filled,
84                          source->size - source->filled);
85                 if (n < 0) {
86                         if (errno != EAGAIN && errno != EWOULDBLOCK)
87                                 log_error("read(%d, ..., %zd): %m", source->fd,
88                                           source->size - source->filled);
89                         return -errno;
90                 } else if (n == 0)
91                         return 0;
92
93                 source->filled += n;
94         }
95
96         *line = source->buf;
97         *size = c + 1 - source->buf;
98
99         /* Check if something remains */
100         remain = source->buf + source->filled - c - 1;
101         assert(remain >= 0);
102         if (remain) {
103                 newsize = MAX(remain, LINE_CHUNK);
104                 newbuf = malloc(newsize);
105                 if (!newbuf)
106                         return log_oom();
107                 memcpy(newbuf, c + 1, remain);
108         }
109         source->buf = newbuf;
110         source->size = newsize;
111         source->filled = remain;
112         source->scanned = 0;
113
114         return 1;
115 }
116
117 int push_data(RemoteSource *source, const char *data, size_t size) {
118         assert(source);
119         assert(source->state != STATE_EOF);
120
121         if (!GREEDY_REALLOC(source->buf, source->size,
122                             source->filled + size)) {
123                 log_error("Failed to store received data of size %zu "
124                           "(in addition to existing %zu bytes with %zu filled): %s",
125                           size, source->size, source->filled, strerror(ENOMEM));
126                 return -ENOMEM;
127         }
128
129         memcpy(source->buf + source->filled, data, size);
130         source->filled += size;
131
132         return 0;
133 }
134
135 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
136         int n;
137         char *newbuf = NULL;
138         size_t newsize = 0, remain;
139
140         assert(source);
141         assert(source->state == STATE_DATA_START ||
142                source->state == STATE_DATA ||
143                source->state == STATE_DATA_FINISH);
144         assert(size <= DATA_SIZE_MAX);
145         assert(source->filled <= source->size);
146         assert(source->scanned <= source->filled);
147         assert(source->buf != NULL || source->size == 0);
148         assert(source->buf == NULL || source->size > 0);
149         assert(data);
150
151         while(source->filled < size) {
152                 if (source->fd < 0)
153                         /* we have to wait for some data to come to us */
154                         return -EWOULDBLOCK;
155
156                 if (!GREEDY_REALLOC(source->buf, source->size, size))
157                         return log_oom();
158
159                 n = read(source->fd, source->buf + source->filled,
160                          source->size - source->filled);
161                 if (n < 0) {
162                         if (errno != EAGAIN && errno != EWOULDBLOCK)
163                                 log_error("read(%d, ..., %zd): %m", source->fd,
164                                           source->size - source->filled);
165                         return -errno;
166                 } else if (n == 0)
167                         return 0;
168
169                 source->filled += n;
170         }
171
172         *data = source->buf;
173
174         /* Check if something remains */
175         assert(size <= source->filled);
176         remain = source->filled - size;
177         if (remain) {
178                 newsize = MAX(remain, LINE_CHUNK);
179                 newbuf = malloc(newsize);
180                 if (!newbuf)
181                         return log_oom();
182                 memcpy(newbuf, source->buf + size, remain);
183         }
184         source->buf = newbuf;
185         source->size = newsize;
186         source->filled = remain;
187         source->scanned = 0;
188
189         return 1;
190 }
191
192 static int get_data_size(RemoteSource *source) {
193         int r;
194         _cleanup_free_ void *data = NULL;
195
196         assert(source);
197         assert(source->state == STATE_DATA_START);
198         assert(source->data_size == 0);
199
200         r = fill_fixed_size(source, &data, sizeof(uint64_t));
201         if (r <= 0)
202                 return r;
203
204         source->data_size = le64toh( *(uint64_t *) data );
205         if (source->data_size > DATA_SIZE_MAX) {
206                 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
207                           source->data_size, DATA_SIZE_MAX);
208                 return -EINVAL;
209         }
210         if (source->data_size == 0)
211                 log_warning("Binary field with zero length");
212
213         return 1;
214 }
215
216 static int get_data_data(RemoteSource *source, void **data) {
217         int r;
218
219         assert(source);
220         assert(data);
221         assert(source->state == STATE_DATA);
222
223         r = fill_fixed_size(source, data, source->data_size);
224         if (r <= 0)
225                 return r;
226
227         return 1;
228 }
229
230 static int get_data_newline(RemoteSource *source) {
231         int r;
232         _cleanup_free_ char *data = NULL;
233
234         assert(source);
235         assert(source->state == STATE_DATA_FINISH);
236
237         r = fill_fixed_size(source, (void**) &data, 1);
238         if (r <= 0)
239                 return r;
240
241         assert(data);
242         if (*data != '\n') {
243                 log_error("expected newline, got '%c'", *data);
244                 return -EINVAL;
245         }
246
247         return 1;
248 }
249
250 static int process_dunder(RemoteSource *source, char *line, size_t n) {
251         const char *timestamp;
252         int r;
253
254         assert(line);
255         assert(n > 0);
256         assert(line[n-1] == '\n');
257
258         /* XXX: is it worth to support timestamps in extended format?
259          * We don't produce them, but who knows... */
260
261         timestamp = startswith(line, "__CURSOR=");
262         if (timestamp)
263                 /* ignore __CURSOR */
264                 return 1;
265
266         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
267         if (timestamp) {
268                 long long unsigned x;
269                 line[n-1] = '\0';
270                 r = safe_atollu(timestamp, &x);
271                 if (r < 0)
272                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
273                 else
274                         source->ts.realtime = x;
275                 return r < 0 ? r : 1;
276         }
277
278         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
279         if (timestamp) {
280                 long long unsigned x;
281                 line[n-1] = '\0';
282                 r = safe_atollu(timestamp, &x);
283                 if (r < 0)
284                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
285                 else
286                         source->ts.monotonic = x;
287                 return r < 0 ? r : 1;
288         }
289
290         timestamp = startswith(line, "__");
291         if (timestamp) {
292                 log_notice("Unknown dunder line %s", line);
293                 return 1;
294         }
295
296         /* no dunder */
297         return 0;
298 }
299
300 int process_data(RemoteSource *source) {
301         int r;
302
303         switch(source->state) {
304         case STATE_LINE: {
305                 char *line, *sep;
306                 size_t n;
307
308                 assert(source->data_size == 0);
309
310                 r = get_line(source, &line, &n);
311                 if (r < 0)
312                         return r;
313                 if (r == 0) {
314                         source->state = STATE_EOF;
315                         return r;
316                 }
317                 assert(n > 0);
318                 assert(line[n-1] == '\n');
319
320                 if (n == 1) {
321                         log_debug("Received empty line, event is ready");
322                         free(line);
323                         return 1;
324                 }
325
326                 r = process_dunder(source, line, n);
327                 if (r != 0) {
328                         free(line);
329                         return r < 0 ? r : 0;
330                 }
331
332                 /* MESSAGE=xxx\n
333                    or
334                    COREDUMP\n
335                    LLLLLLLL0011223344...\n
336                 */
337                 sep = memchr(line, '=', n);
338                 if (sep)
339                         /* chomp newline */
340                         n--;
341                 else
342                         /* replace \n with = */
343                         line[n-1] = '=';
344                 log_debug("Received: %.*s", (int) n, line);
345
346                 r = iovw_put(&source->iovw, line, n);
347                 if (r < 0) {
348                         log_error("Failed to put line in iovect");
349                         free(line);
350                         return r;
351                 }
352
353                 if (!sep)
354                         source->state = STATE_DATA_START;
355                 return 0; /* continue */
356         }
357
358         case STATE_DATA_START:
359                 assert(source->data_size == 0);
360
361                 r = get_data_size(source);
362                 log_debug("get_data_size() -> %d", r);
363                 if (r < 0)
364                         return r;
365                 if (r == 0) {
366                         source->state = STATE_EOF;
367                         return 0;
368                 }
369
370                 source->state = source->data_size > 0 ?
371                         STATE_DATA : STATE_DATA_FINISH;
372
373                 return 0; /* continue */
374
375         case STATE_DATA: {
376                 void *data;
377
378                 assert(source->data_size > 0);
379
380                 r = get_data_data(source, &data);
381                 log_debug("get_data_data() -> %d", r);
382                 if (r < 0)
383                         return r;
384                 if (r == 0) {
385                         source->state = STATE_EOF;
386                         return 0;
387                 }
388
389                 assert(data);
390
391                 r = iovw_put(&source->iovw, data, source->data_size);
392                 if (r < 0) {
393                         log_error("failed to put binary buffer in iovect");
394                         return r;
395                 }
396
397                 source->state = STATE_DATA_FINISH;
398
399                 return 0; /* continue */
400         }
401
402         case STATE_DATA_FINISH:
403                 r = get_data_newline(source);
404                 log_debug("get_data_newline() -> %d", r);
405                 if (r < 0)
406                         return r;
407                 if (r == 0) {
408                         source->state = STATE_EOF;
409                         return 0;
410                 }
411
412                 source->data_size = 0;
413                 source->state = STATE_LINE;
414
415                 return 0; /* continue */
416         default:
417                 assert_not_reached("wtf?");
418         }
419 }
420
421 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
422         int r;
423
424         assert(source);
425         assert(writer);
426
427         r = process_data(source);
428         if (r <= 0)
429                 return r;
430
431         /* We have a full event */
432         log_info("Received a full event from source@%p fd:%d (%s)",
433                  source, source->fd, source->name);
434
435         if (!source->iovw.count) {
436                 log_warning("Entry with no payload, skipping");
437                 goto freeing;
438         }
439
440         assert(source->iovw.iovec);
441         assert(source->iovw.count);
442
443         r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
444         if (r < 0)
445                 log_error("Failed to write entry of %zu bytes: %s",
446                           iovw_size(&source->iovw), strerror(-r));
447         else
448                 r = 1;
449
450  freeing:
451         iovw_free_contents(&source->iovw);
452         return r;
453 }