chiark / gitweb /
239ff381975703c920b0a9ad6d1e2348fe3905c6
[elogind.git] / src / journal / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 close(source->fd);
34         }
35         free(source->name);
36         free(source->buf);
37         iovw_free_contents(&source->iovw);
38         free(source);
39 }
40
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
42         ssize_t n, remain;
43         char *c = NULL;
44         char *newbuf = NULL;
45         size_t newsize = 0;
46
47         assert(source);
48         assert(source->state == STATE_LINE);
49         assert(source->filled <= source->size);
50         assert(source->buf == NULL || source->size > 0);
51
52         if (source->buf)
53                 c = memchr(source->buf, '\n', source->filled);
54
55         if (c != NULL)
56                 goto docopy;
57
58  resize:
59         if (source->fd < 0)
60                 /* we have to wait for some data to come to us */
61                 return -EWOULDBLOCK;
62
63         if (source->size - source->filled < LINE_CHUNK) {
64                 // XXX: add check for maximum line length
65
66                 if (!GREEDY_REALLOC(source->buf, source->size,
67                                     source->filled + LINE_CHUNK))
68                         return log_oom();
69         }
70         assert(source->size - source->filled >= LINE_CHUNK);
71
72         n = read(source->fd, source->buf + source->filled,
73                  source->size - source->filled);
74         if (n < 0) {
75                 if (errno != EAGAIN && errno != EWOULDBLOCK)
76                         log_error("read(%d, ..., %zd): %m", source->fd,
77                                   source->size - source->filled);
78                 return -errno;
79         } else if (n == 0)
80                 return 0;
81
82         c = memchr(source->buf + source->filled, '\n', n);
83         source->filled += n;
84
85         if (c == NULL)
86                 goto resize;
87
88  docopy:
89         *line = source->buf;
90         *size = c + 1 - source->buf;
91
92         /* Check if something remains */
93         remain = source->buf + source->filled - c - 1;
94         assert(remain >= 0);
95         if (remain) {
96                 newsize = MAX(remain, LINE_CHUNK);
97                 newbuf = malloc(newsize);
98                 if (!newbuf)
99                         return log_oom();
100                 memcpy(newbuf, c + 1, remain);
101         }
102         source->buf = newbuf;
103         source->size = newsize;
104         source->filled = remain;
105
106         return 1;
107 }
108
109 int push_data(RemoteSource *source, const char *data, size_t size) {
110         assert(source);
111         assert(source->state != STATE_EOF);
112
113         if (!GREEDY_REALLOC(source->buf, source->size,
114                             source->filled + size))
115                 return log_oom();
116
117         memcpy(source->buf + source->filled, data, size);
118         source->filled += size;
119
120         return 0;
121 }
122
123 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
124         int n;
125         char *newbuf = NULL;
126         size_t newsize = 0, remain;
127
128         assert(source);
129         assert(source->state == STATE_DATA_START ||
130                source->state == STATE_DATA ||
131                source->state == STATE_DATA_FINISH);
132         assert(size <= DATA_SIZE_MAX);
133         assert(source->filled <= source->size);
134         assert(source->buf != NULL || source->size == 0);
135         assert(source->buf == NULL || source->size > 0);
136         assert(data);
137
138         while(source->filled < size) {
139                 if (source->fd < 0)
140                         /* we have to wait for some data to come to us */
141                         return -EWOULDBLOCK;
142
143                 if (!GREEDY_REALLOC(source->buf, source->size, size))
144                         return log_oom();
145
146                 n = read(source->fd, source->buf + source->filled,
147                          source->size - source->filled);
148                 if (n < 0) {
149                         if (errno != EAGAIN && errno != EWOULDBLOCK)
150                                 log_error("read(%d, ..., %zd): %m", source->fd,
151                                           source->size - source->filled);
152                         return -errno;
153                 } else if (n == 0)
154                         return 0;
155
156                 source->filled += n;
157         }
158
159         *data = source->buf;
160
161         /* Check if something remains */
162         assert(size <= source->filled);
163         remain = source->filled - size;
164         if (remain) {
165                 newsize = MAX(remain, LINE_CHUNK);
166                 newbuf = malloc(newsize);
167                 if (!newbuf)
168                         return log_oom();
169                 memcpy(newbuf, source->buf + size, remain);
170         }
171         source->buf = newbuf;
172         source->size = newsize;
173         source->filled = remain;
174
175         return 1;
176 }
177
178 static int get_data_size(RemoteSource *source) {
179         int r;
180         void _cleanup_free_ *data = NULL;
181
182         assert(source);
183         assert(source->state == STATE_DATA_START);
184         assert(source->data_size == 0);
185
186         r = fill_fixed_size(source, &data, sizeof(uint64_t));
187         if (r <= 0)
188                 return r;
189
190         source->data_size = le64toh( *(uint64_t *) data );
191         if (source->data_size > DATA_SIZE_MAX) {
192                 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
193                           source->data_size, DATA_SIZE_MAX);
194                 return -EINVAL;
195         }
196         if (source->data_size == 0)
197                 log_warning("Binary field with zero length");
198
199         return 1;
200 }
201
202 static int get_data_data(RemoteSource *source, void **data) {
203         int r;
204
205         assert(source);
206         assert(data);
207         assert(source->state == STATE_DATA);
208
209         r = fill_fixed_size(source, data, source->data_size);
210         if (r <= 0)
211                 return r;
212
213         return 1;
214 }
215
216 static int get_data_newline(RemoteSource *source) {
217         int r;
218         char _cleanup_free_ *data = NULL;
219
220         assert(source);
221         assert(source->state == STATE_DATA_FINISH);
222
223         r = fill_fixed_size(source, (void**) &data, 1);
224         if (r <= 0)
225                 return r;
226
227         assert(data);
228         if (*data != '\n') {
229                 log_error("expected newline, got '%c'", *data);
230                 return -EINVAL;
231         }
232
233         return 1;
234 }
235
236 static int process_dunder(RemoteSource *source, char *line, size_t n) {
237         const char *timestamp;
238         int r;
239
240         assert(line);
241         assert(n > 0);
242         assert(line[n-1] == '\n');
243
244         /* XXX: is it worth to support timestamps in extended format?
245          * We don't produce them, but who knows... */
246
247         timestamp = startswith(line, "__CURSOR=");
248         if (timestamp)
249                 /* ignore __CURSOR */
250                 return 1;
251
252         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
253         if (timestamp) {
254                 long long unsigned x;
255                 line[n-1] = '\0';
256                 r = safe_atollu(timestamp, &x);
257                 if (r < 0)
258                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
259                 else
260                         source->ts.realtime = x;
261                 return r < 0 ? r : 1;
262         }
263
264         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
265         if (timestamp) {
266                 long long unsigned x;
267                 line[n-1] = '\0';
268                 r = safe_atollu(timestamp, &x);
269                 if (r < 0)
270                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
271                 else
272                         source->ts.monotonic = x;
273                 return r < 0 ? r : 1;
274         }
275
276         timestamp = startswith(line, "__");
277         if (timestamp) {
278                 log_notice("Unknown dunder line %s", line);
279                 return 1;
280         }
281
282         /* no dunder */
283         return 0;
284 }
285
286 int process_data(RemoteSource *source) {
287         int r;
288
289         switch(source->state) {
290         case STATE_LINE: {
291                 char *line, *sep;
292                 size_t n;
293
294                 assert(source->data_size == 0);
295
296                 r = get_line(source, &line, &n);
297                 if (r < 0)
298                         return r;
299                 if (r == 0) {
300                         source->state = STATE_EOF;
301                         return r;
302                 }
303                 assert(n > 0);
304                 assert(line[n-1] == '\n');
305
306                 if (n == 1) {
307                         log_debug("Received empty line, event is ready");
308                         free(line);
309                         return 1;
310                 }
311
312                 r = process_dunder(source, line, n);
313                 if (r != 0) {
314                         free(line);
315                         return r < 0 ? r : 0;
316                 }
317
318                 /* MESSAGE=xxx\n
319                    or
320                    COREDUMP\n
321                    LLLLLLLL0011223344...\n
322                 */
323                 sep = memchr(line, '=', n);
324                 if (sep)
325                         /* chomp newline */
326                         n--;
327                 else
328                         /* replace \n with = */
329                         line[n-1] = '=';
330                 log_debug("Received: %.*s", (int) n, line);
331
332                 r = iovw_put(&source->iovw, line, n);
333                 if (r < 0) {
334                         log_error("Failed to put line in iovect");
335                         free(line);
336                         return r;
337                 }
338
339                 if (!sep)
340                         source->state = STATE_DATA_START;
341                 return 0; /* continue */
342         }
343
344         case STATE_DATA_START:
345                 assert(source->data_size == 0);
346
347                 r = get_data_size(source);
348                 log_debug("get_data_size() -> %d", r);
349                 if (r < 0)
350                         return r;
351                 if (r == 0) {
352                         source->state = STATE_EOF;
353                         return 0;
354                 }
355
356                 source->state = source->data_size > 0 ?
357                         STATE_DATA : STATE_DATA_FINISH;
358
359                 return 0; /* continue */
360
361         case STATE_DATA: {
362                 void *data;
363
364                 assert(source->data_size > 0);
365
366                 r = get_data_data(source, &data);
367                 log_debug("get_data_data() -> %d", r);
368                 if (r < 0)
369                         return r;
370                 if (r == 0) {
371                         source->state = STATE_EOF;
372                         return 0;
373                 }
374
375                 assert(data);
376
377                 r = iovw_put(&source->iovw, data, source->data_size);
378                 if (r < 0) {
379                         log_error("failed to put binary buffer in iovect");
380                         return r;
381                 }
382
383                 source->state = STATE_DATA_FINISH;
384
385                 return 0; /* continue */
386         }
387
388         case STATE_DATA_FINISH:
389                 r = get_data_newline(source);
390                 log_debug("get_data_newline() -> %d", r);
391                 if (r < 0)
392                         return r;
393                 if (r == 0) {
394                         source->state = STATE_EOF;
395                         return 0;
396                 }
397
398                 source->data_size = 0;
399                 source->state = STATE_LINE;
400
401                 return 0; /* continue */
402         default:
403                 assert_not_reached("wtf?");
404         }
405 }
406
407 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
408         int r;
409
410         assert(source);
411         assert(writer);
412
413         r = process_data(source);
414         if (r <= 0)
415                 return r;
416
417         /* We have a full event */
418         log_info("Received a full event from source@%p fd:%d (%s)",
419                  source, source->fd, source->name);
420
421         if (!source->iovw.count) {
422                 log_warning("Entry with no payload, skipping");
423                 goto freeing;
424         }
425
426         assert(source->iovw.iovec);
427         assert(source->iovw.count);
428
429         r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
430         if (r < 0)
431                 log_error("Failed to write entry of %zu bytes: %s",
432                           iovw_size(&source->iovw), strerror(-r));
433         else
434                 r = 1;
435
436  freeing:
437         iovw_free_contents(&source->iovw);
438         return r;
439 }