chiark / gitweb /
journal-remote: rework fd and writer reference handling
[elogind.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0 && !source->passive_fd) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 safe_close(source->fd);
34         }
35
36         free(source->name);
37         free(source->buf);
38         iovw_free_contents(&source->iovw);
39
40         log_debug("Writer ref count %u", source->writer->n_ref);
41         writer_unref(source->writer);
42
43         sd_event_source_unref(source->event);
44
45         free(source);
46 }
47
48 /**
49  * Initialize zero-filled source with given values. On success, takes
50  * ownerhship of fd and writer, otherwise does not touch them.
51  */
52 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
53
54         RemoteSource *source;
55
56         log_debug("Creating source for %sfd:%d (%s)",
57                   passive_fd ? "passive " : "", fd, name);
58
59         assert(fd >= 0);
60
61         source = new0(RemoteSource, 1);
62         if (!source)
63                 return NULL;
64
65         source->fd = fd;
66         source->passive_fd = passive_fd;
67         source->name = name;
68         source->writer = writer;
69
70         return source;
71 }
72
73 static int get_line(RemoteSource *source, char **line, size_t *size) {
74         ssize_t n, remain;
75         char *c = NULL;
76         char *newbuf = NULL;
77         size_t newsize = 0;
78
79         assert(source);
80         assert(source->state == STATE_LINE);
81         assert(source->filled <= source->size);
82         assert(source->buf == NULL || source->size > 0);
83         assert(source->fd >= 0);
84
85         while (true) {
86                 if (source->buf)
87                         c = memchr(source->buf + source->scanned, '\n',
88                                    source->filled - source->scanned);
89                 if (c != NULL)
90                         break;
91
92                 source->scanned = source->filled;
93                 if (source->scanned >= DATA_SIZE_MAX) {
94                         log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
95                         return -E2BIG;
96                 }
97
98                 if (source->passive_fd)
99                         /* we have to wait for some data to come to us */
100                         return -EWOULDBLOCK;
101
102                 if (source->size - source->filled < LINE_CHUNK &&
103                     !GREEDY_REALLOC(source->buf, source->size,
104                                     MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
105                                 return log_oom();
106
107                 assert(source->size - source->filled >= LINE_CHUNK ||
108                        source->size == DATA_SIZE_MAX);
109
110                 // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
111                 // to accomodate such big fields.
112
113                 n = read(source->fd, source->buf + source->filled,
114                          source->size - source->filled);
115                 if (n < 0) {
116                         if (errno != EAGAIN && errno != EWOULDBLOCK)
117                                 log_error("read(%d, ..., %zd): %m", source->fd,
118                                           source->size - source->filled);
119                         return -errno;
120                 } else if (n == 0)
121                         return 0;
122
123                 source->filled += n;
124         }
125
126         *line = source->buf;
127         *size = c + 1 - source->buf;
128
129         /* Check if something remains */
130         remain = source->buf + source->filled - c - 1;
131         assert(remain >= 0);
132         if (remain) {
133                 newsize = MAX(remain, LINE_CHUNK);
134                 newbuf = malloc(newsize);
135                 if (!newbuf)
136                         return log_oom();
137                 memcpy(newbuf, c + 1, remain);
138         }
139         source->buf = newbuf;
140         source->size = newsize;
141         source->filled = remain;
142         source->scanned = 0;
143
144         return 1;
145 }
146
147 int push_data(RemoteSource *source, const char *data, size_t size) {
148         assert(source);
149         assert(source->state != STATE_EOF);
150
151         if (!GREEDY_REALLOC(source->buf, source->size,
152                             source->filled + size)) {
153                 log_error("Failed to store received data of size %zu "
154                           "(in addition to existing %zu bytes with %zu filled): %s",
155                           size, source->size, source->filled, strerror(ENOMEM));
156                 return -ENOMEM;
157         }
158
159         memcpy(source->buf + source->filled, data, size);
160         source->filled += size;
161
162         return 0;
163 }
164
165 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
166         int n;
167         char *newbuf = NULL;
168         size_t newsize = 0, remain;
169
170         assert(source);
171         assert(source->state == STATE_DATA_START ||
172                source->state == STATE_DATA ||
173                source->state == STATE_DATA_FINISH);
174         assert(size <= DATA_SIZE_MAX);
175         assert(source->filled <= source->size);
176         assert(source->scanned <= source->filled);
177         assert(source->buf != NULL || source->size == 0);
178         assert(source->buf == NULL || source->size > 0);
179         assert(source->fd >= 0);
180         assert(data);
181
182         while(source->filled < size) {
183                 if (source->passive_fd)
184                         /* we have to wait for some data to come to us */
185                         return -EWOULDBLOCK;
186
187                 if (!GREEDY_REALLOC(source->buf, source->size, size))
188                         return log_oom();
189
190                 n = read(source->fd, source->buf + source->filled,
191                          source->size - source->filled);
192                 if (n < 0) {
193                         if (errno != EAGAIN && errno != EWOULDBLOCK)
194                                 log_error("read(%d, ..., %zd): %m", source->fd,
195                                           source->size - source->filled);
196                         return -errno;
197                 } else if (n == 0)
198                         return 0;
199
200                 source->filled += n;
201         }
202
203         *data = source->buf;
204
205         /* Check if something remains */
206         assert(size <= source->filled);
207         remain = source->filled - size;
208         if (remain) {
209                 newsize = MAX(remain, LINE_CHUNK);
210                 newbuf = malloc(newsize);
211                 if (!newbuf)
212                         return log_oom();
213                 memcpy(newbuf, source->buf + size, remain);
214         }
215         source->buf = newbuf;
216         source->size = newsize;
217         source->filled = remain;
218         source->scanned = 0;
219
220         return 1;
221 }
222
223 static int get_data_size(RemoteSource *source) {
224         int r;
225         _cleanup_free_ void *data = NULL;
226
227         assert(source);
228         assert(source->state == STATE_DATA_START);
229         assert(source->data_size == 0);
230
231         r = fill_fixed_size(source, &data, sizeof(uint64_t));
232         if (r <= 0)
233                 return r;
234
235         source->data_size = le64toh( *(uint64_t *) data );
236         if (source->data_size > DATA_SIZE_MAX) {
237                 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
238                           source->data_size, DATA_SIZE_MAX);
239                 return -EINVAL;
240         }
241         if (source->data_size == 0)
242                 log_warning("Binary field with zero length");
243
244         return 1;
245 }
246
247 static int get_data_data(RemoteSource *source, void **data) {
248         int r;
249
250         assert(source);
251         assert(data);
252         assert(source->state == STATE_DATA);
253
254         r = fill_fixed_size(source, data, source->data_size);
255         if (r <= 0)
256                 return r;
257
258         return 1;
259 }
260
261 static int get_data_newline(RemoteSource *source) {
262         int r;
263         _cleanup_free_ char *data = NULL;
264
265         assert(source);
266         assert(source->state == STATE_DATA_FINISH);
267
268         r = fill_fixed_size(source, (void**) &data, 1);
269         if (r <= 0)
270                 return r;
271
272         assert(data);
273         if (*data != '\n') {
274                 log_error("expected newline, got '%c'", *data);
275                 return -EINVAL;
276         }
277
278         return 1;
279 }
280
281 static int process_dunder(RemoteSource *source, char *line, size_t n) {
282         const char *timestamp;
283         int r;
284
285         assert(line);
286         assert(n > 0);
287         assert(line[n-1] == '\n');
288
289         /* XXX: is it worth to support timestamps in extended format?
290          * We don't produce them, but who knows... */
291
292         timestamp = startswith(line, "__CURSOR=");
293         if (timestamp)
294                 /* ignore __CURSOR */
295                 return 1;
296
297         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
298         if (timestamp) {
299                 long long unsigned x;
300                 line[n-1] = '\0';
301                 r = safe_atollu(timestamp, &x);
302                 if (r < 0)
303                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
304                 else
305                         source->ts.realtime = x;
306                 return r < 0 ? r : 1;
307         }
308
309         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
310         if (timestamp) {
311                 long long unsigned x;
312                 line[n-1] = '\0';
313                 r = safe_atollu(timestamp, &x);
314                 if (r < 0)
315                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
316                 else
317                         source->ts.monotonic = x;
318                 return r < 0 ? r : 1;
319         }
320
321         timestamp = startswith(line, "__");
322         if (timestamp) {
323                 log_notice("Unknown dunder line %s", line);
324                 return 1;
325         }
326
327         /* no dunder */
328         return 0;
329 }
330
331 int process_data(RemoteSource *source) {
332         int r;
333
334         switch(source->state) {
335         case STATE_LINE: {
336                 char *line, *sep;
337                 size_t n;
338
339                 assert(source->data_size == 0);
340
341                 r = get_line(source, &line, &n);
342                 if (r < 0)
343                         return r;
344                 if (r == 0) {
345                         source->state = STATE_EOF;
346                         return r;
347                 }
348                 assert(n > 0);
349                 assert(line[n-1] == '\n');
350
351                 if (n == 1) {
352                         log_debug("Received empty line, event is ready");
353                         free(line);
354                         return 1;
355                 }
356
357                 r = process_dunder(source, line, n);
358                 if (r != 0) {
359                         free(line);
360                         return r < 0 ? r : 0;
361                 }
362
363                 /* MESSAGE=xxx\n
364                    or
365                    COREDUMP\n
366                    LLLLLLLL0011223344...\n
367                 */
368                 sep = memchr(line, '=', n);
369                 if (sep)
370                         /* chomp newline */
371                         n--;
372                 else
373                         /* replace \n with = */
374                         line[n-1] = '=';
375                 log_debug("Received: %.*s", (int) n, line);
376
377                 r = iovw_put(&source->iovw, line, n);
378                 if (r < 0) {
379                         log_error("Failed to put line in iovect");
380                         free(line);
381                         return r;
382                 }
383
384                 if (!sep)
385                         source->state = STATE_DATA_START;
386                 return 0; /* continue */
387         }
388
389         case STATE_DATA_START:
390                 assert(source->data_size == 0);
391
392                 r = get_data_size(source);
393                 log_debug("get_data_size() -> %d", r);
394                 if (r < 0)
395                         return r;
396                 if (r == 0) {
397                         source->state = STATE_EOF;
398                         return 0;
399                 }
400
401                 source->state = source->data_size > 0 ?
402                         STATE_DATA : STATE_DATA_FINISH;
403
404                 return 0; /* continue */
405
406         case STATE_DATA: {
407                 void *data;
408
409                 assert(source->data_size > 0);
410
411                 r = get_data_data(source, &data);
412                 log_debug("get_data_data() -> %d", r);
413                 if (r < 0)
414                         return r;
415                 if (r == 0) {
416                         source->state = STATE_EOF;
417                         return 0;
418                 }
419
420                 assert(data);
421
422                 r = iovw_put(&source->iovw, data, source->data_size);
423                 if (r < 0) {
424                         log_error("failed to put binary buffer in iovect");
425                         return r;
426                 }
427
428                 source->state = STATE_DATA_FINISH;
429
430                 return 0; /* continue */
431         }
432
433         case STATE_DATA_FINISH:
434                 r = get_data_newline(source);
435                 log_debug("get_data_newline() -> %d", r);
436                 if (r < 0)
437                         return r;
438                 if (r == 0) {
439                         source->state = STATE_EOF;
440                         return 0;
441                 }
442
443                 source->data_size = 0;
444                 source->state = STATE_LINE;
445
446                 return 0; /* continue */
447         default:
448                 assert_not_reached("wtf?");
449         }
450 }
451
452 int process_source(RemoteSource *source, bool compress, bool seal) {
453         int r;
454
455         assert(source);
456         assert(source->writer);
457
458         r = process_data(source);
459         if (r <= 0)
460                 return r;
461
462         /* We have a full event */
463         log_debug("Received a full event from source@%p fd:%d (%s)",
464                   source, source->fd, source->name);
465
466         if (!source->iovw.count) {
467                 log_warning("Entry with no payload, skipping");
468                 goto freeing;
469         }
470
471         assert(source->iovw.iovec);
472         assert(source->iovw.count);
473
474         r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
475         if (r < 0)
476                 log_error("Failed to write entry of %zu bytes: %s",
477                           iovw_size(&source->iovw), strerror(-r));
478         else
479                 r = 1;
480
481  freeing:
482         iovw_free_contents(&source->iovw);
483         return r;
484 }