chiark / gitweb /
c961844c443e7f08d86a0052602e78b73b907821
[elogind.git] / src / journal / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 close(source->fd);
34         }
35         free(source->name);
36         free(source->buf);
37         iovw_free_contents(&source->iovw);
38         free(source);
39 }
40
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
42         ssize_t n, remain;
43         char *c;
44         char *newbuf = NULL;
45         size_t newsize = 0;
46
47         assert(source);
48         assert(source->state == STATE_LINE);
49         assert(source->filled <= source->size);
50         assert(source->buf == NULL || source->size > 0);
51
52         c = memchr(source->buf, '\n', source->filled);
53         if (c != NULL)
54                 goto docopy;
55
56  resize:
57         if (source->size - source->filled < LINE_CHUNK) {
58                 // XXX: add check for maximum line length
59
60                 if (!GREEDY_REALLOC(source->buf, source->size,
61                                     source->filled + LINE_CHUNK))
62                         return log_oom();
63         }
64         assert(source->size - source->filled >= LINE_CHUNK);
65
66         n = read(source->fd, source->buf + source->filled,
67                  source->size - source->filled);
68         if (n < 0) {
69                 if (errno != EAGAIN && errno != EWOULDBLOCK)
70                         log_error("read(%d, ..., %zd): %m", source->fd,
71                                   source->size - source->filled);
72                 return -errno;
73         } else if (n == 0)
74                 return 0;
75
76         c = memchr(source->buf + source->filled, '\n', n);
77         source->filled += n;
78
79         if (c == NULL)
80                 goto resize;
81
82  docopy:
83         *line = source->buf;
84         *size = c + 1 - source->buf;
85
86         /* Check if something remains */
87         remain = source->buf + source->filled - c - 1;
88         assert(remain >= 0);
89         if (remain) {
90                 newsize = MAX(remain, LINE_CHUNK);
91                 newbuf = malloc(newsize);
92                 if (!newbuf)
93                         return log_oom();
94                 memcpy(newbuf, c + 1, remain);
95         }
96         source->buf = newbuf;
97         source->size = newsize;
98         source->filled = remain;
99
100         return 1;
101 }
102
103 int push_data(RemoteSource *source, const char *data, size_t size) {
104         assert(source);
105         assert(source->state != STATE_EOF);
106
107         if (!GREEDY_REALLOC(source->buf, source->size,
108                             source->filled + size))
109                 return log_oom();
110
111         memcpy(source->buf + source->filled, data, size);
112         source->filled += size;
113
114         return 0;
115 }
116
117 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
118         int n;
119         char *newbuf = NULL;
120         size_t newsize = 0, remain;
121
122         assert(source);
123         assert(source->state == STATE_DATA_START ||
124                source->state == STATE_DATA ||
125                source->state == STATE_DATA_FINISH);
126         assert(size <= DATA_SIZE_MAX);
127         assert(source->filled <= source->size);
128         assert(source->buf != NULL || source->size == 0);
129         assert(source->buf == NULL || source->size > 0);
130         assert(data);
131
132         while(source->filled < size) {
133                 if (!GREEDY_REALLOC(source->buf, source->size, size))
134                         return log_oom();
135
136                 n = read(source->fd, source->buf + source->filled,
137                          source->size - source->filled);
138                 if (n < 0) {
139                         if (errno != EAGAIN && errno != EWOULDBLOCK)
140                                 log_error("read(%d, ..., %zd): %m", source->fd,
141                                           source->size - source->filled);
142                         return -errno;
143                 } else if (n == 0)
144                         return 0;
145
146                 source->filled += n;
147         }
148
149         *data = source->buf;
150
151         /* Check if something remains */
152         assert(size <= source->filled);
153         remain = source->filled - size;
154         if (remain) {
155                 newsize = MAX(remain, LINE_CHUNK);
156                 newbuf = malloc(newsize);
157                 if (!newbuf)
158                         return log_oom();
159                 memcpy(newbuf, source->buf + size, remain);
160         }
161         source->buf = newbuf;
162         source->size = newsize;
163         source->filled = remain;
164
165         return 1;
166 }
167
168 static int get_data_size(RemoteSource *source) {
169         int r;
170         void _cleanup_free_ *data = NULL;
171
172         assert(source);
173         assert(source->state == STATE_DATA_START);
174         assert(source->data_size == 0);
175
176         r = fill_fixed_size(source, &data, sizeof(uint64_t));
177         if (r <= 0)
178                 return r;
179
180         source->data_size = le64toh( *(uint64_t *) data );
181         if (source->data_size > DATA_SIZE_MAX) {
182                 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
183                           source->data_size, DATA_SIZE_MAX);
184                 return -EINVAL;
185         }
186         if (source->data_size == 0)
187                 log_warning("Binary field with zero length");
188
189         return 1;
190 }
191
192 static int get_data_data(RemoteSource *source, void **data) {
193         int r;
194
195         assert(source);
196         assert(data);
197         assert(source->state == STATE_DATA);
198
199         r = fill_fixed_size(source, data, source->data_size);
200         if (r <= 0)
201                 return r;
202
203         return 1;
204 }
205
206 static int get_data_newline(RemoteSource *source) {
207         int r;
208         char _cleanup_free_ *data = NULL;
209
210         assert(source);
211         assert(source->state == STATE_DATA_FINISH);
212
213         r = fill_fixed_size(source, (void**) &data, 1);
214         if (r <= 0)
215                 return r;
216
217         assert(data);
218         if (*data != '\n') {
219                 log_error("expected newline, got '%c'", *data);
220                 return -EINVAL;
221         }
222
223         return 1;
224 }
225
226 static int process_dunder(RemoteSource *source, char *line, size_t n) {
227         const char *timestamp;
228         int r;
229
230         assert(line);
231         assert(n > 0);
232         assert(line[n-1] == '\n');
233
234         /* XXX: is it worth to support timestamps in extended format?
235          * We don't produce them, but who knows... */
236
237         timestamp = startswith(line, "__CURSOR=");
238         if (timestamp)
239                 /* ignore __CURSOR */
240                 return 1;
241
242         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
243         if (timestamp) {
244                 long long unsigned x;
245                 line[n-1] = '\0';
246                 r = safe_atollu(timestamp, &x);
247                 if (r < 0)
248                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
249                 else
250                         source->ts.realtime = x;
251                 return r < 0 ? r : 1;
252         }
253
254         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
255         if (timestamp) {
256                 long long unsigned x;
257                 line[n-1] = '\0';
258                 r = safe_atollu(timestamp, &x);
259                 if (r < 0)
260                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
261                 else
262                         source->ts.monotonic = x;
263                 return r < 0 ? r : 1;
264         }
265
266         timestamp = startswith(line, "__");
267         if (timestamp) {
268                 log_notice("Unknown dunder line %s", line);
269                 return 1;
270         }
271
272         /* no dunder */
273         return 0;
274 }
275
276 int process_data(RemoteSource *source) {
277         int r;
278
279         switch(source->state) {
280         case STATE_LINE: {
281                 char *line, *sep;
282                 size_t n;
283
284                 assert(source->data_size == 0);
285
286                 r = get_line(source, &line, &n);
287                 if (r < 0)
288                         return r;
289                 if (r == 0) {
290                         source->state = STATE_EOF;
291                         return r;
292                 }
293                 assert(n > 0);
294                 assert(line[n-1] == '\n');
295
296                 if (n == 1) {
297                         log_debug("Received empty line, event is ready");
298                         free(line);
299                         return 1;
300                 }
301
302                 r = process_dunder(source, line, n);
303                 if (r != 0) {
304                         free(line);
305                         return r < 0 ? r : 0;
306                 }
307
308                 /* MESSAGE=xxx\n
309                    or
310                    COREDUMP\n
311                    LLLLLLLL0011223344...\n
312                 */
313                 sep = memchr(line, '=', n);
314                 if (sep)
315                         /* chomp newline */
316                         n--;
317                 else
318                         /* replace \n with = */
319                         line[n-1] = '=';
320                 log_debug("Received: %.*s", (int) n, line);
321
322                 r = iovw_put(&source->iovw, line, n);
323                 if (r < 0) {
324                         log_error("Failed to put line in iovect");
325                         free(line);
326                         return r;
327                 }
328
329                 if (!sep)
330                         source->state = STATE_DATA_START;
331                 return 0; /* continue */
332         }
333
334         case STATE_DATA_START:
335                 assert(source->data_size == 0);
336
337                 r = get_data_size(source);
338                 log_debug("get_data_size() -> %d", r);
339                 if (r < 0)
340                         return r;
341                 if (r == 0) {
342                         source->state = STATE_EOF;
343                         return 0;
344                 }
345
346                 source->state = source->data_size > 0 ?
347                         STATE_DATA : STATE_DATA_FINISH;
348
349                 return 0; /* continue */
350
351         case STATE_DATA: {
352                 void *data;
353
354                 assert(source->data_size > 0);
355
356                 r = get_data_data(source, &data);
357                 log_debug("get_data_data() -> %d", r);
358                 if (r < 0)
359                         return r;
360                 if (r == 0) {
361                         source->state = STATE_EOF;
362                         return 0;
363                 }
364
365                 assert(data);
366
367                 r = iovw_put(&source->iovw, data, source->data_size);
368                 if (r < 0) {
369                         log_error("failed to put binary buffer in iovect");
370                         return r;
371                 }
372
373                 source->state = STATE_DATA_FINISH;
374
375                 return 0; /* continue */
376         }
377
378         case STATE_DATA_FINISH:
379                 r = get_data_newline(source);
380                 log_debug("get_data_newline() -> %d", r);
381                 if (r < 0)
382                         return r;
383                 if (r == 0) {
384                         source->state = STATE_EOF;
385                         return 0;
386                 }
387
388                 source->data_size = 0;
389                 source->state = STATE_LINE;
390
391                 return 0; /* continue */
392         default:
393                 assert_not_reached("wtf?");
394         }
395 }
396
397 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
398         int r;
399
400         assert(source);
401         assert(writer);
402
403         r = process_data(source);
404         if (r <= 0)
405                 return r;
406
407         /* We have a full event */
408         log_info("Received a full event from source@%p fd:%d (%s)",
409                  source, source->fd, source->name);
410
411         if (!source->iovw.count) {
412                 log_warning("Entry with no payload, skipping");
413                 goto freeing;
414         }
415
416         assert(source->iovw.iovec);
417         assert(source->iovw.count);
418
419         r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
420         if (r < 0)
421                 log_error("Failed to write entry of %zu bytes: %s",
422                           iovw_size(&source->iovw), strerror(-r));
423         else
424                 r = 1;
425
426  freeing:
427         iovw_free_contents(&source->iovw);
428         return r;
429 }