chiark / gitweb /
treewide: more log_*_errno() conversions, multiline calls
[elogind.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0 && !source->passive_fd) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 safe_close(source->fd);
34         }
35
36         free(source->name);
37         free(source->buf);
38         iovw_free_contents(&source->iovw);
39
40         log_debug("Writer ref count %u", source->writer->n_ref);
41         writer_unref(source->writer);
42
43         sd_event_source_unref(source->event);
44
45         free(source);
46 }
47
48 /**
49  * Initialize zero-filled source with given values. On success, takes
50  * ownerhship of fd and writer, otherwise does not touch them.
51  */
52 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
53
54         RemoteSource *source;
55
56         log_debug("Creating source for %sfd:%d (%s)",
57                   passive_fd ? "passive " : "", fd, name);
58
59         assert(fd >= 0);
60
61         source = new0(RemoteSource, 1);
62         if (!source)
63                 return NULL;
64
65         source->fd = fd;
66         source->passive_fd = passive_fd;
67         source->name = name;
68         source->writer = writer;
69
70         return source;
71 }
72
73 static char* realloc_buffer(RemoteSource *source, size_t size) {
74         char *b, *old = source->buf;
75
76         b = GREEDY_REALLOC(source->buf, source->size, size);
77         if (!b)
78                 return NULL;
79
80         iovw_rebase(&source->iovw, old, source->buf);
81
82         return b;
83 }
84
85 static int get_line(RemoteSource *source, char **line, size_t *size) {
86         ssize_t n;
87         char *c = NULL;
88
89         assert(source);
90         assert(source->state == STATE_LINE);
91         assert(source->offset <= source->filled);
92         assert(source->filled <= source->size);
93         assert(source->buf == NULL || source->size > 0);
94         assert(source->fd >= 0);
95
96         while (true) {
97                 if (source->buf) {
98                         size_t start = MAX(source->scanned, source->offset);
99
100                         c = memchr(source->buf + start, '\n',
101                                    source->filled - start);
102                         if (c != NULL)
103                                 break;
104                 }
105
106                 source->scanned = source->filled;
107                 if (source->scanned >= DATA_SIZE_MAX) {
108                         log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
109                         return -E2BIG;
110                 }
111
112                 if (source->passive_fd)
113                         /* we have to wait for some data to come to us */
114                         return -EWOULDBLOCK;
115
116                 if (source->size - source->filled < LINE_CHUNK &&
117                     !realloc_buffer(source,
118                                     MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
119                                 return log_oom();
120
121                 assert(source->size - source->filled >= LINE_CHUNK ||
122                        source->size == ENTRY_SIZE_MAX);
123
124                 n = read(source->fd, source->buf + source->filled,
125                          source->size - source->filled);
126                 if (n < 0) {
127                         if (errno != EAGAIN && errno != EWOULDBLOCK)
128                                 log_error("read(%d, ..., %zd): %m", source->fd,
129                                           source->size - source->filled);
130                         return -errno;
131                 } else if (n == 0)
132                         return 0;
133
134                 source->filled += n;
135         }
136
137         *line = source->buf + source->offset;
138         *size = c + 1 - source->buf - source->offset;
139         source->offset += *size;
140
141         return 1;
142 }
143
144 int push_data(RemoteSource *source, const char *data, size_t size) {
145         assert(source);
146         assert(source->state != STATE_EOF);
147
148         if (!realloc_buffer(source, source->filled + size)) {
149                 log_error("Failed to store received data of size %zu "
150                           "(in addition to existing %zu bytes with %zu filled): %s",
151                           size, source->size, source->filled, strerror(ENOMEM));
152                 return -ENOMEM;
153         }
154
155         memcpy(source->buf + source->filled, data, size);
156         source->filled += size;
157
158         return 0;
159 }
160
161 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
162
163         assert(source);
164         assert(source->state == STATE_DATA_START ||
165                source->state == STATE_DATA ||
166                source->state == STATE_DATA_FINISH);
167         assert(size <= DATA_SIZE_MAX);
168         assert(source->offset <= source->filled);
169         assert(source->filled <= source->size);
170         assert(source->buf != NULL || source->size == 0);
171         assert(source->buf == NULL || source->size > 0);
172         assert(source->fd >= 0);
173         assert(data);
174
175         while (source->filled - source->offset < size) {
176                 int n;
177
178                 if (source->passive_fd)
179                         /* we have to wait for some data to come to us */
180                         return -EWOULDBLOCK;
181
182                 if (!realloc_buffer(source, source->offset + size))
183                         return log_oom();
184
185                 n = read(source->fd, source->buf + source->filled,
186                          source->size - source->filled);
187                 if (n < 0) {
188                         if (errno != EAGAIN && errno != EWOULDBLOCK)
189                                 log_error("read(%d, ..., %zd): %m", source->fd,
190                                           source->size - source->filled);
191                         return -errno;
192                 } else if (n == 0)
193                         return 0;
194
195                 source->filled += n;
196         }
197
198         *data = source->buf + source->offset;
199         source->offset += size;
200
201         return 1;
202 }
203
204 static int get_data_size(RemoteSource *source) {
205         int r;
206         void *data;
207
208         assert(source);
209         assert(source->state == STATE_DATA_START);
210         assert(source->data_size == 0);
211
212         r = fill_fixed_size(source, &data, sizeof(uint64_t));
213         if (r <= 0)
214                 return r;
215
216         source->data_size = le64toh( *(uint64_t *) data );
217         if (source->data_size > DATA_SIZE_MAX) {
218                 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
219                           source->data_size, DATA_SIZE_MAX);
220                 return -EINVAL;
221         }
222         if (source->data_size == 0)
223                 log_warning("Binary field with zero length");
224
225         return 1;
226 }
227
228 static int get_data_data(RemoteSource *source, void **data) {
229         int r;
230
231         assert(source);
232         assert(data);
233         assert(source->state == STATE_DATA);
234
235         r = fill_fixed_size(source, data, source->data_size);
236         if (r <= 0)
237                 return r;
238
239         return 1;
240 }
241
242 static int get_data_newline(RemoteSource *source) {
243         int r;
244         char *data;
245
246         assert(source);
247         assert(source->state == STATE_DATA_FINISH);
248
249         r = fill_fixed_size(source, (void**) &data, 1);
250         if (r <= 0)
251                 return r;
252
253         assert(data);
254         if (*data != '\n') {
255                 log_error("expected newline, got '%c'", *data);
256                 return -EINVAL;
257         }
258
259         return 1;
260 }
261
262 static int process_dunder(RemoteSource *source, char *line, size_t n) {
263         const char *timestamp;
264         int r;
265
266         assert(line);
267         assert(n > 0);
268         assert(line[n-1] == '\n');
269
270         /* XXX: is it worth to support timestamps in extended format?
271          * We don't produce them, but who knows... */
272
273         timestamp = startswith(line, "__CURSOR=");
274         if (timestamp)
275                 /* ignore __CURSOR */
276                 return 1;
277
278         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
279         if (timestamp) {
280                 long long unsigned x;
281                 line[n-1] = '\0';
282                 r = safe_atollu(timestamp, &x);
283                 if (r < 0)
284                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
285                 else
286                         source->ts.realtime = x;
287                 return r < 0 ? r : 1;
288         }
289
290         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
291         if (timestamp) {
292                 long long unsigned x;
293                 line[n-1] = '\0';
294                 r = safe_atollu(timestamp, &x);
295                 if (r < 0)
296                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
297                 else
298                         source->ts.monotonic = x;
299                 return r < 0 ? r : 1;
300         }
301
302         timestamp = startswith(line, "__");
303         if (timestamp) {
304                 log_notice("Unknown dunder line %s", line);
305                 return 1;
306         }
307
308         /* no dunder */
309         return 0;
310 }
311
312 int process_data(RemoteSource *source) {
313         int r;
314
315         switch(source->state) {
316         case STATE_LINE: {
317                 char *line, *sep;
318                 size_t n;
319
320                 assert(source->data_size == 0);
321
322                 r = get_line(source, &line, &n);
323                 if (r < 0)
324                         return r;
325                 if (r == 0) {
326                         source->state = STATE_EOF;
327                         return r;
328                 }
329                 assert(n > 0);
330                 assert(line[n-1] == '\n');
331
332                 if (n == 1) {
333                         log_trace("Received empty line, event is ready");
334                         return 1;
335                 }
336
337                 r = process_dunder(source, line, n);
338                 if (r != 0)
339                         return r < 0 ? r : 0;
340
341                 /* MESSAGE=xxx\n
342                    or
343                    COREDUMP\n
344                    LLLLLLLL0011223344...\n
345                 */
346                 sep = memchr(line, '=', n);
347                 if (sep)
348                         /* chomp newline */
349                         n--;
350                 else
351                         /* replace \n with = */
352                         line[n-1] = '=';
353                 log_trace("Received: %.*s", (int) n, line);
354
355                 r = iovw_put(&source->iovw, line, n);
356                 if (r < 0) {
357                         log_error("Failed to put line in iovect");
358                         return r;
359                 }
360
361                 if (!sep)
362                         source->state = STATE_DATA_START;
363                 return 0; /* continue */
364         }
365
366         case STATE_DATA_START:
367                 assert(source->data_size == 0);
368
369                 r = get_data_size(source);
370                 // log_debug("get_data_size() -> %d", r);
371                 if (r < 0)
372                         return r;
373                 if (r == 0) {
374                         source->state = STATE_EOF;
375                         return 0;
376                 }
377
378                 source->state = source->data_size > 0 ?
379                         STATE_DATA : STATE_DATA_FINISH;
380
381                 return 0; /* continue */
382
383         case STATE_DATA: {
384                 void *data;
385
386                 assert(source->data_size > 0);
387
388                 r = get_data_data(source, &data);
389                 // log_debug("get_data_data() -> %d", r);
390                 if (r < 0)
391                         return r;
392                 if (r == 0) {
393                         source->state = STATE_EOF;
394                         return 0;
395                 }
396
397                 assert(data);
398
399                 r = iovw_put(&source->iovw, data, source->data_size);
400                 if (r < 0) {
401                         log_error("failed to put binary buffer in iovect");
402                         return r;
403                 }
404
405                 source->state = STATE_DATA_FINISH;
406
407                 return 0; /* continue */
408         }
409
410         case STATE_DATA_FINISH:
411                 r = get_data_newline(source);
412                 // log_debug("get_data_newline() -> %d", r);
413                 if (r < 0)
414                         return r;
415                 if (r == 0) {
416                         source->state = STATE_EOF;
417                         return 0;
418                 }
419
420                 source->data_size = 0;
421                 source->state = STATE_LINE;
422
423                 return 0; /* continue */
424         default:
425                 assert_not_reached("wtf?");
426         }
427 }
428
429 int process_source(RemoteSource *source, bool compress, bool seal) {
430         size_t remain, target;
431         int r;
432
433         assert(source);
434         assert(source->writer);
435
436         r = process_data(source);
437         if (r <= 0)
438                 return r;
439
440         /* We have a full event */
441         log_trace("Received a full event from source@%p fd:%d (%s)",
442                   source, source->fd, source->name);
443
444         if (!source->iovw.count) {
445                 log_warning("Entry with no payload, skipping");
446                 goto freeing;
447         }
448
449         assert(source->iovw.iovec);
450         assert(source->iovw.count);
451
452         r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
453         if (r < 0)
454                 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
455                                 iovw_size(&source->iovw));
456         else
457                 r = 1;
458
459  freeing:
460         iovw_free_contents(&source->iovw);
461
462         /* possibly reset buffer position */
463         remain = source->filled - source->offset;
464
465         if (remain == 0) /* no brainer */
466                 source->offset = source->scanned = source->filled = 0;
467         else if (source->offset > source->size - source->filled &&
468                  source->offset > remain) {
469                 memcpy(source->buf, source->buf + source->offset, remain);
470                 source->offset = source->scanned = 0;
471                 source->filled = remain;
472         }
473
474         target = source->size;
475         while (target > 16 * LINE_CHUNK && remain < target / 2)
476                 target /= 2;
477         if (target < source->size) {
478                 char *tmp;
479
480                 tmp = realloc(source->buf, target);
481                 if (!tmp)
482                         log_warning("Failed to reallocate buffer to (smaller) size %zu",
483                                     target);
484                 else {
485                         log_debug("Reallocated buffer from %zu to %zu bytes",
486                                   source->size, target);
487                         source->buf = tmp;
488                         source->size = target;
489                 }
490         }
491
492         return r;
493 }