chiark / gitweb /
6c096de03aa30e110046f805ee3ec2bdc3e84b0b
[elogind.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0 && !source->passive_fd) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 safe_close(source->fd);
34         }
35
36         free(source->name);
37         free(source->buf);
38         iovw_free_contents(&source->iovw);
39
40         log_debug("Writer ref count %i", source->writer->n_ref);
41         writer_unref(source->writer);
42
43         sd_event_source_unref(source->event);
44
45         free(source);
46 }
47
48 /**
49  * Initialize zero-filled source with given values. On success, takes
50  * ownerhship of fd and writer, otherwise does not touch them.
51  */
52 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
53
54         RemoteSource *source;
55
56         log_debug("Creating source for %sfd:%d (%s)",
57                   passive_fd ? "passive " : "", fd, name);
58
59         assert(fd >= 0);
60
61         source = new0(RemoteSource, 1);
62         if (!source)
63                 return NULL;
64
65         source->fd = fd;
66         source->passive_fd = passive_fd;
67         source->name = name;
68         source->writer = writer;
69
70         return source;
71 }
72
73 static char* realloc_buffer(RemoteSource *source, size_t size) {
74         char *b, *old = source->buf;
75
76         b = GREEDY_REALLOC(source->buf, source->size, size);
77         if (!b)
78                 return NULL;
79
80         iovw_rebase(&source->iovw, old, source->buf);
81
82         return b;
83 }
84
85 static int get_line(RemoteSource *source, char **line, size_t *size) {
86         ssize_t n;
87         char *c = NULL;
88
89         assert(source);
90         assert(source->state == STATE_LINE);
91         assert(source->offset <= source->filled);
92         assert(source->filled <= source->size);
93         assert(source->buf == NULL || source->size > 0);
94         assert(source->fd >= 0);
95
96         while (true) {
97                 if (source->buf) {
98                         size_t start = MAX(source->scanned, source->offset);
99
100                         c = memchr(source->buf + start, '\n',
101                                    source->filled - start);
102                         if (c != NULL)
103                                 break;
104                 }
105
106                 source->scanned = source->filled;
107                 if (source->scanned >= DATA_SIZE_MAX) {
108                         log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
109                         return -E2BIG;
110                 }
111
112                 if (source->passive_fd)
113                         /* we have to wait for some data to come to us */
114                         return -EWOULDBLOCK;
115
116                 if (source->size - source->filled < LINE_CHUNK &&
117                     !realloc_buffer(source,
118                                     MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
119                                 return log_oom();
120
121                 assert(source->size - source->filled >= LINE_CHUNK ||
122                        source->size == ENTRY_SIZE_MAX);
123
124                 n = read(source->fd, source->buf + source->filled,
125                          source->size - source->filled);
126                 if (n < 0) {
127                         if (errno != EAGAIN && errno != EWOULDBLOCK)
128                                 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
129                                                 source->size - source->filled);
130                         return -errno;
131                 } else if (n == 0)
132                         return 0;
133
134                 source->filled += n;
135         }
136
137         *line = source->buf + source->offset;
138         *size = c + 1 - source->buf - source->offset;
139         source->offset += *size;
140
141         return 1;
142 }
143
144 int push_data(RemoteSource *source, const char *data, size_t size) {
145         assert(source);
146         assert(source->state != STATE_EOF);
147
148         if (!realloc_buffer(source, source->filled + size)) {
149                 log_error("Failed to store received data of size %zu "
150                           "(in addition to existing %zu bytes with %zu filled): %s",
151                           size, source->size, source->filled, strerror(ENOMEM));
152                 return -ENOMEM;
153         }
154
155         memcpy(source->buf + source->filled, data, size);
156         source->filled += size;
157
158         return 0;
159 }
160
161 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
162
163         assert(source);
164         assert(source->state == STATE_DATA_START ||
165                source->state == STATE_DATA ||
166                source->state == STATE_DATA_FINISH);
167         assert(size <= DATA_SIZE_MAX);
168         assert(source->offset <= source->filled);
169         assert(source->filled <= source->size);
170         assert(source->buf != NULL || source->size == 0);
171         assert(source->buf == NULL || source->size > 0);
172         assert(source->fd >= 0);
173         assert(data);
174
175         while (source->filled - source->offset < size) {
176                 int n;
177
178                 if (source->passive_fd)
179                         /* we have to wait for some data to come to us */
180                         return -EWOULDBLOCK;
181
182                 if (!realloc_buffer(source, source->offset + size))
183                         return log_oom();
184
185                 n = read(source->fd, source->buf + source->filled,
186                          source->size - source->filled);
187                 if (n < 0) {
188                         if (errno != EAGAIN && errno != EWOULDBLOCK)
189                                 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
190                                                 source->size - source->filled);
191                         return -errno;
192                 } else if (n == 0)
193                         return 0;
194
195                 source->filled += n;
196         }
197
198         *data = source->buf + source->offset;
199         source->offset += size;
200
201         return 1;
202 }
203
204 static int get_data_size(RemoteSource *source) {
205         int r;
206         void *data;
207
208         assert(source);
209         assert(source->state == STATE_DATA_START);
210         assert(source->data_size == 0);
211
212         r = fill_fixed_size(source, &data, sizeof(uint64_t));
213         if (r <= 0)
214                 return r;
215
216         source->data_size = le64toh( *(uint64_t *) data );
217         if (source->data_size > DATA_SIZE_MAX) {
218                 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
219                           source->data_size, DATA_SIZE_MAX);
220                 return -EINVAL;
221         }
222         if (source->data_size == 0)
223                 log_warning("Binary field with zero length");
224
225         return 1;
226 }
227
228 static int get_data_data(RemoteSource *source, void **data) {
229         int r;
230
231         assert(source);
232         assert(data);
233         assert(source->state == STATE_DATA);
234
235         r = fill_fixed_size(source, data, source->data_size);
236         if (r <= 0)
237                 return r;
238
239         return 1;
240 }
241
242 static int get_data_newline(RemoteSource *source) {
243         int r;
244         char *data;
245
246         assert(source);
247         assert(source->state == STATE_DATA_FINISH);
248
249         r = fill_fixed_size(source, (void**) &data, 1);
250         if (r <= 0)
251                 return r;
252
253         assert(data);
254         if (*data != '\n') {
255                 log_error("expected newline, got '%c'", *data);
256                 return -EINVAL;
257         }
258
259         return 1;
260 }
261
262 static int process_dunder(RemoteSource *source, char *line, size_t n) {
263         const char *timestamp;
264         int r;
265
266         assert(line);
267         assert(n > 0);
268         assert(line[n-1] == '\n');
269
270         /* XXX: is it worth to support timestamps in extended format?
271          * We don't produce them, but who knows... */
272
273         timestamp = startswith(line, "__CURSOR=");
274         if (timestamp)
275                 /* ignore __CURSOR */
276                 return 1;
277
278         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
279         if (timestamp) {
280                 long long unsigned x;
281                 line[n-1] = '\0';
282                 r = safe_atollu(timestamp, &x);
283                 if (r < 0)
284                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
285                 else
286                         source->ts.realtime = x;
287                 return r < 0 ? r : 1;
288         }
289
290         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
291         if (timestamp) {
292                 long long unsigned x;
293                 line[n-1] = '\0';
294                 r = safe_atollu(timestamp, &x);
295                 if (r < 0)
296                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
297                 else
298                         source->ts.monotonic = x;
299                 return r < 0 ? r : 1;
300         }
301
302         timestamp = startswith(line, "__");
303         if (timestamp) {
304                 log_notice("Unknown dunder line %s", line);
305                 return 1;
306         }
307
308         /* no dunder */
309         return 0;
310 }
311
312 int process_data(RemoteSource *source) {
313         int r;
314
315         switch(source->state) {
316         case STATE_LINE: {
317                 char *line, *sep;
318                 size_t n;
319
320                 assert(source->data_size == 0);
321
322                 r = get_line(source, &line, &n);
323                 if (r < 0)
324                         return r;
325                 if (r == 0) {
326                         source->state = STATE_EOF;
327                         return r;
328                 }
329                 assert(n > 0);
330                 assert(line[n-1] == '\n');
331
332                 if (n == 1) {
333                         log_trace("Received empty line, event is ready");
334                         return 1;
335                 }
336
337                 r = process_dunder(source, line, n);
338                 if (r != 0)
339                         return r < 0 ? r : 0;
340
341                 /* MESSAGE=xxx\n
342                    or
343                    COREDUMP\n
344                    LLLLLLLL0011223344...\n
345                 */
346                 sep = memchr(line, '=', n);
347                 if (sep) {
348                         /* chomp newline */
349                         n--;
350
351                         r = iovw_put(&source->iovw, line, n);
352                         if (r < 0)
353                                 return r;
354                 } else {
355                         /* replace \n with = */
356                         line[n-1] = '=';
357
358                         source->field_len = n;
359                         source->state = STATE_DATA_START;
360
361                         /* we cannot put the field in iovec until we have all data */
362                 }
363
364                 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
365
366                 return 0; /* continue */
367         }
368
369         case STATE_DATA_START:
370                 assert(source->data_size == 0);
371
372                 r = get_data_size(source);
373                 // log_debug("get_data_size() -> %d", r);
374                 if (r < 0)
375                         return r;
376                 if (r == 0) {
377                         source->state = STATE_EOF;
378                         return 0;
379                 }
380
381                 source->state = source->data_size > 0 ?
382                         STATE_DATA : STATE_DATA_FINISH;
383
384                 return 0; /* continue */
385
386         case STATE_DATA: {
387                 void *data;
388                 char *field;
389
390                 assert(source->data_size > 0);
391
392                 r = get_data_data(source, &data);
393                 // log_debug("get_data_data() -> %d", r);
394                 if (r < 0)
395                         return r;
396                 if (r == 0) {
397                         source->state = STATE_EOF;
398                         return 0;
399                 }
400
401                 assert(data);
402
403                 field = (char*) data - sizeof(uint64_t) - source->field_len;
404                 memmove(field + sizeof(uint64_t), field, source->field_len);
405
406                 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
407                 if (r < 0)
408                         return r;
409
410                 source->state = STATE_DATA_FINISH;
411
412                 return 0; /* continue */
413         }
414
415         case STATE_DATA_FINISH:
416                 r = get_data_newline(source);
417                 // log_debug("get_data_newline() -> %d", r);
418                 if (r < 0)
419                         return r;
420                 if (r == 0) {
421                         source->state = STATE_EOF;
422                         return 0;
423                 }
424
425                 source->data_size = 0;
426                 source->state = STATE_LINE;
427
428                 return 0; /* continue */
429         default:
430                 assert_not_reached("wtf?");
431         }
432 }
433
434 int process_source(RemoteSource *source, bool compress, bool seal) {
435         size_t remain, target;
436         int r;
437
438         assert(source);
439         assert(source->writer);
440
441         r = process_data(source);
442         if (r <= 0)
443                 return r;
444
445         /* We have a full event */
446         log_trace("Received full event from source@%p fd:%d (%s)",
447                   source, source->fd, source->name);
448
449         if (!source->iovw.count) {
450                 log_warning("Entry with no payload, skipping");
451                 goto freeing;
452         }
453
454         assert(source->iovw.iovec);
455         assert(source->iovw.count);
456
457         r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
458         if (r < 0)
459                 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
460                                 iovw_size(&source->iovw));
461         else
462                 r = 1;
463
464  freeing:
465         iovw_free_contents(&source->iovw);
466
467         /* possibly reset buffer position */
468         remain = source->filled - source->offset;
469
470         if (remain == 0) /* no brainer */
471                 source->offset = source->scanned = source->filled = 0;
472         else if (source->offset > source->size - source->filled &&
473                  source->offset > remain) {
474                 memcpy(source->buf, source->buf + source->offset, remain);
475                 source->offset = source->scanned = 0;
476                 source->filled = remain;
477         }
478
479         target = source->size;
480         while (target > 16 * LINE_CHUNK && remain < target / 2)
481                 target /= 2;
482         if (target < source->size) {
483                 char *tmp;
484
485                 tmp = realloc(source->buf, target);
486                 if (!tmp)
487                         log_warning("Failed to reallocate buffer to (smaller) size %zu",
488                                     target);
489                 else {
490                         log_debug("Reallocated buffer from %zu to %zu bytes",
491                                   source->size, target);
492                         source->buf = tmp;
493                         source->size = target;
494                 }
495         }
496
497         return r;
498 }