chiark / gitweb /
journal-remote: reject fields above maximum size
[elogind.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 close(source->fd);
34         }
35         free(source->name);
36         free(source->buf);
37         iovw_free_contents(&source->iovw);
38         free(source);
39 }
40
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
42         ssize_t n, remain;
43         char *c = NULL;
44         char *newbuf = NULL;
45         size_t newsize = 0;
46
47         assert(source);
48         assert(source->state == STATE_LINE);
49         assert(source->filled <= source->size);
50         assert(source->buf == NULL || source->size > 0);
51
52         while (true) {
53                 if (source->buf)
54                         c = memchr(source->buf + source->scanned, '\n',
55                                    source->filled - source->scanned);
56                 if (c != NULL)
57                         break;
58
59                 source->scanned = source->filled;
60                 if (source->scanned >= DATA_SIZE_MAX) {
61                         log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
62                         return -E2BIG;
63                 }
64
65                 if (source->fd < 0)
66                         /* we have to wait for some data to come to us */
67                         return -EWOULDBLOCK;
68
69                 if (source->size - source->filled < LINE_CHUNK &&
70                     !GREEDY_REALLOC(source->buf, source->size,
71                                     MAX(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
72                                 return log_oom();
73
74                 assert(source->size - source->filled >= LINE_CHUNK);
75
76                 n = read(source->fd, source->buf + source->filled,
77                          MAX(source->size, DATA_SIZE_MAX) - source->filled);
78                 if (n < 0) {
79                         if (errno != EAGAIN && errno != EWOULDBLOCK)
80                                 log_error("read(%d, ..., %zd): %m", source->fd,
81                                           source->size - source->filled);
82                         return -errno;
83                 } else if (n == 0)
84                         return 0;
85
86                 source->filled += n;
87         }
88
89         *line = source->buf;
90         *size = c + 1 - source->buf;
91
92         /* Check if something remains */
93         remain = source->buf + source->filled - c - 1;
94         assert(remain >= 0);
95         if (remain) {
96                 newsize = MAX(remain, LINE_CHUNK);
97                 newbuf = malloc(newsize);
98                 if (!newbuf)
99                         return log_oom();
100                 memcpy(newbuf, c + 1, remain);
101         }
102         source->buf = newbuf;
103         source->size = newsize;
104         source->filled = remain;
105         source->scanned = 0;
106
107         return 1;
108 }
109
110 int push_data(RemoteSource *source, const char *data, size_t size) {
111         assert(source);
112         assert(source->state != STATE_EOF);
113
114         if (!GREEDY_REALLOC(source->buf, source->size,
115                             source->filled + size)) {
116                 log_error("Failed to store received data of size %zu "
117                           "(in addition to existing %zu bytes with %zu filled): %s",
118                           size, source->size, source->filled, strerror(ENOMEM));
119                 return -ENOMEM;
120         }
121
122         memcpy(source->buf + source->filled, data, size);
123         source->filled += size;
124
125         return 0;
126 }
127
128 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
129         int n;
130         char *newbuf = NULL;
131         size_t newsize = 0, remain;
132
133         assert(source);
134         assert(source->state == STATE_DATA_START ||
135                source->state == STATE_DATA ||
136                source->state == STATE_DATA_FINISH);
137         assert(size <= DATA_SIZE_MAX);
138         assert(source->filled <= source->size);
139         assert(source->scanned <= source->filled);
140         assert(source->buf != NULL || source->size == 0);
141         assert(source->buf == NULL || source->size > 0);
142         assert(data);
143
144         while(source->filled < size) {
145                 if (source->fd < 0)
146                         /* we have to wait for some data to come to us */
147                         return -EWOULDBLOCK;
148
149                 if (!GREEDY_REALLOC(source->buf, source->size, size))
150                         return log_oom();
151
152                 n = read(source->fd, source->buf + source->filled,
153                          source->size - source->filled);
154                 if (n < 0) {
155                         if (errno != EAGAIN && errno != EWOULDBLOCK)
156                                 log_error("read(%d, ..., %zd): %m", source->fd,
157                                           source->size - source->filled);
158                         return -errno;
159                 } else if (n == 0)
160                         return 0;
161
162                 source->filled += n;
163         }
164
165         *data = source->buf;
166
167         /* Check if something remains */
168         assert(size <= source->filled);
169         remain = source->filled - size;
170         if (remain) {
171                 newsize = MAX(remain, LINE_CHUNK);
172                 newbuf = malloc(newsize);
173                 if (!newbuf)
174                         return log_oom();
175                 memcpy(newbuf, source->buf + size, remain);
176         }
177         source->buf = newbuf;
178         source->size = newsize;
179         source->filled = remain;
180         source->scanned = 0;
181
182         return 1;
183 }
184
185 static int get_data_size(RemoteSource *source) {
186         int r;
187         _cleanup_free_ void *data = NULL;
188
189         assert(source);
190         assert(source->state == STATE_DATA_START);
191         assert(source->data_size == 0);
192
193         r = fill_fixed_size(source, &data, sizeof(uint64_t));
194         if (r <= 0)
195                 return r;
196
197         source->data_size = le64toh( *(uint64_t *) data );
198         if (source->data_size > DATA_SIZE_MAX) {
199                 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
200                           source->data_size, DATA_SIZE_MAX);
201                 return -EINVAL;
202         }
203         if (source->data_size == 0)
204                 log_warning("Binary field with zero length");
205
206         return 1;
207 }
208
209 static int get_data_data(RemoteSource *source, void **data) {
210         int r;
211
212         assert(source);
213         assert(data);
214         assert(source->state == STATE_DATA);
215
216         r = fill_fixed_size(source, data, source->data_size);
217         if (r <= 0)
218                 return r;
219
220         return 1;
221 }
222
223 static int get_data_newline(RemoteSource *source) {
224         int r;
225         _cleanup_free_ char *data = NULL;
226
227         assert(source);
228         assert(source->state == STATE_DATA_FINISH);
229
230         r = fill_fixed_size(source, (void**) &data, 1);
231         if (r <= 0)
232                 return r;
233
234         assert(data);
235         if (*data != '\n') {
236                 log_error("expected newline, got '%c'", *data);
237                 return -EINVAL;
238         }
239
240         return 1;
241 }
242
243 static int process_dunder(RemoteSource *source, char *line, size_t n) {
244         const char *timestamp;
245         int r;
246
247         assert(line);
248         assert(n > 0);
249         assert(line[n-1] == '\n');
250
251         /* XXX: is it worth to support timestamps in extended format?
252          * We don't produce them, but who knows... */
253
254         timestamp = startswith(line, "__CURSOR=");
255         if (timestamp)
256                 /* ignore __CURSOR */
257                 return 1;
258
259         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
260         if (timestamp) {
261                 long long unsigned x;
262                 line[n-1] = '\0';
263                 r = safe_atollu(timestamp, &x);
264                 if (r < 0)
265                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
266                 else
267                         source->ts.realtime = x;
268                 return r < 0 ? r : 1;
269         }
270
271         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
272         if (timestamp) {
273                 long long unsigned x;
274                 line[n-1] = '\0';
275                 r = safe_atollu(timestamp, &x);
276                 if (r < 0)
277                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
278                 else
279                         source->ts.monotonic = x;
280                 return r < 0 ? r : 1;
281         }
282
283         timestamp = startswith(line, "__");
284         if (timestamp) {
285                 log_notice("Unknown dunder line %s", line);
286                 return 1;
287         }
288
289         /* no dunder */
290         return 0;
291 }
292
293 int process_data(RemoteSource *source) {
294         int r;
295
296         switch(source->state) {
297         case STATE_LINE: {
298                 char *line, *sep;
299                 size_t n;
300
301                 assert(source->data_size == 0);
302
303                 r = get_line(source, &line, &n);
304                 if (r < 0)
305                         return r;
306                 if (r == 0) {
307                         source->state = STATE_EOF;
308                         return r;
309                 }
310                 assert(n > 0);
311                 assert(line[n-1] == '\n');
312
313                 if (n == 1) {
314                         log_debug("Received empty line, event is ready");
315                         free(line);
316                         return 1;
317                 }
318
319                 r = process_dunder(source, line, n);
320                 if (r != 0) {
321                         free(line);
322                         return r < 0 ? r : 0;
323                 }
324
325                 /* MESSAGE=xxx\n
326                    or
327                    COREDUMP\n
328                    LLLLLLLL0011223344...\n
329                 */
330                 sep = memchr(line, '=', n);
331                 if (sep)
332                         /* chomp newline */
333                         n--;
334                 else
335                         /* replace \n with = */
336                         line[n-1] = '=';
337                 log_debug("Received: %.*s", (int) n, line);
338
339                 r = iovw_put(&source->iovw, line, n);
340                 if (r < 0) {
341                         log_error("Failed to put line in iovect");
342                         free(line);
343                         return r;
344                 }
345
346                 if (!sep)
347                         source->state = STATE_DATA_START;
348                 return 0; /* continue */
349         }
350
351         case STATE_DATA_START:
352                 assert(source->data_size == 0);
353
354                 r = get_data_size(source);
355                 log_debug("get_data_size() -> %d", r);
356                 if (r < 0)
357                         return r;
358                 if (r == 0) {
359                         source->state = STATE_EOF;
360                         return 0;
361                 }
362
363                 source->state = source->data_size > 0 ?
364                         STATE_DATA : STATE_DATA_FINISH;
365
366                 return 0; /* continue */
367
368         case STATE_DATA: {
369                 void *data;
370
371                 assert(source->data_size > 0);
372
373                 r = get_data_data(source, &data);
374                 log_debug("get_data_data() -> %d", r);
375                 if (r < 0)
376                         return r;
377                 if (r == 0) {
378                         source->state = STATE_EOF;
379                         return 0;
380                 }
381
382                 assert(data);
383
384                 r = iovw_put(&source->iovw, data, source->data_size);
385                 if (r < 0) {
386                         log_error("failed to put binary buffer in iovect");
387                         return r;
388                 }
389
390                 source->state = STATE_DATA_FINISH;
391
392                 return 0; /* continue */
393         }
394
395         case STATE_DATA_FINISH:
396                 r = get_data_newline(source);
397                 log_debug("get_data_newline() -> %d", r);
398                 if (r < 0)
399                         return r;
400                 if (r == 0) {
401                         source->state = STATE_EOF;
402                         return 0;
403                 }
404
405                 source->data_size = 0;
406                 source->state = STATE_LINE;
407
408                 return 0; /* continue */
409         default:
410                 assert_not_reached("wtf?");
411         }
412 }
413
414 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
415         int r;
416
417         assert(source);
418         assert(writer);
419
420         r = process_data(source);
421         if (r <= 0)
422                 return r;
423
424         /* We have a full event */
425         log_info("Received a full event from source@%p fd:%d (%s)",
426                  source, source->fd, source->name);
427
428         if (!source->iovw.count) {
429                 log_warning("Entry with no payload, skipping");
430                 goto freeing;
431         }
432
433         assert(source->iovw.iovec);
434         assert(source->iovw.count);
435
436         r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
437         if (r < 0)
438                 log_error("Failed to write entry of %zu bytes: %s",
439                           iovw_size(&source->iovw), strerror(-r));
440         else
441                 r = 1;
442
443  freeing:
444         iovw_free_contents(&source->iovw);
445         return r;
446 }