chiark / gitweb /
util: replace close_nointr_nofail() by a more useful safe_close()
[elogind.git] / src / journal / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 close(source->fd);
34         }
35         free(source->name);
36         free(source->buf);
37         iovw_free_contents(&source->iovw);
38         free(source);
39 }
40
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
42         ssize_t n, remain;
43         char *c;
44         char *newbuf = NULL;
45         size_t newsize = 0;
46
47         assert(source);
48         assert(source->state == STATE_LINE);
49         assert(source->filled <= source->size);
50         assert(source->buf == NULL || source->size > 0);
51
52         c = memchr(source->buf, '\n', source->filled);
53         if (c != NULL)
54                 goto docopy;
55
56  resize:
57         if (source->fd < 0)
58                 /* we have to wait for some data to come to us */
59                 return -EWOULDBLOCK;
60
61         if (source->size - source->filled < LINE_CHUNK) {
62                 // XXX: add check for maximum line length
63
64                 if (!GREEDY_REALLOC(source->buf, source->size,
65                                     source->filled + LINE_CHUNK))
66                         return log_oom();
67         }
68         assert(source->size - source->filled >= LINE_CHUNK);
69
70         n = read(source->fd, source->buf + source->filled,
71                  source->size - source->filled);
72         if (n < 0) {
73                 if (errno != EAGAIN && errno != EWOULDBLOCK)
74                         log_error("read(%d, ..., %zd): %m", source->fd,
75                                   source->size - source->filled);
76                 return -errno;
77         } else if (n == 0)
78                 return 0;
79
80         c = memchr(source->buf + source->filled, '\n', n);
81         source->filled += n;
82
83         if (c == NULL)
84                 goto resize;
85
86  docopy:
87         *line = source->buf;
88         *size = c + 1 - source->buf;
89
90         /* Check if something remains */
91         remain = source->buf + source->filled - c - 1;
92         assert(remain >= 0);
93         if (remain) {
94                 newsize = MAX(remain, LINE_CHUNK);
95                 newbuf = malloc(newsize);
96                 if (!newbuf)
97                         return log_oom();
98                 memcpy(newbuf, c + 1, remain);
99         }
100         source->buf = newbuf;
101         source->size = newsize;
102         source->filled = remain;
103
104         return 1;
105 }
106
107 int push_data(RemoteSource *source, const char *data, size_t size) {
108         assert(source);
109         assert(source->state != STATE_EOF);
110
111         if (!GREEDY_REALLOC(source->buf, source->size,
112                             source->filled + size))
113                 return log_oom();
114
115         memcpy(source->buf + source->filled, data, size);
116         source->filled += size;
117
118         return 0;
119 }
120
121 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
122         int n;
123         char *newbuf = NULL;
124         size_t newsize = 0, remain;
125
126         assert(source);
127         assert(source->state == STATE_DATA_START ||
128                source->state == STATE_DATA ||
129                source->state == STATE_DATA_FINISH);
130         assert(size <= DATA_SIZE_MAX);
131         assert(source->filled <= source->size);
132         assert(source->buf != NULL || source->size == 0);
133         assert(source->buf == NULL || source->size > 0);
134         assert(data);
135
136         while(source->filled < size) {
137                 if (source->fd < 0)
138                         /* we have to wait for some data to come to us */
139                         return -EWOULDBLOCK;
140
141                 if (!GREEDY_REALLOC(source->buf, source->size, size))
142                         return log_oom();
143
144                 n = read(source->fd, source->buf + source->filled,
145                          source->size - source->filled);
146                 if (n < 0) {
147                         if (errno != EAGAIN && errno != EWOULDBLOCK)
148                                 log_error("read(%d, ..., %zd): %m", source->fd,
149                                           source->size - source->filled);
150                         return -errno;
151                 } else if (n == 0)
152                         return 0;
153
154                 source->filled += n;
155         }
156
157         *data = source->buf;
158
159         /* Check if something remains */
160         assert(size <= source->filled);
161         remain = source->filled - size;
162         if (remain) {
163                 newsize = MAX(remain, LINE_CHUNK);
164                 newbuf = malloc(newsize);
165                 if (!newbuf)
166                         return log_oom();
167                 memcpy(newbuf, source->buf + size, remain);
168         }
169         source->buf = newbuf;
170         source->size = newsize;
171         source->filled = remain;
172
173         return 1;
174 }
175
176 static int get_data_size(RemoteSource *source) {
177         int r;
178         void _cleanup_free_ *data = NULL;
179
180         assert(source);
181         assert(source->state == STATE_DATA_START);
182         assert(source->data_size == 0);
183
184         r = fill_fixed_size(source, &data, sizeof(uint64_t));
185         if (r <= 0)
186                 return r;
187
188         source->data_size = le64toh( *(uint64_t *) data );
189         if (source->data_size > DATA_SIZE_MAX) {
190                 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
191                           source->data_size, DATA_SIZE_MAX);
192                 return -EINVAL;
193         }
194         if (source->data_size == 0)
195                 log_warning("Binary field with zero length");
196
197         return 1;
198 }
199
200 static int get_data_data(RemoteSource *source, void **data) {
201         int r;
202
203         assert(source);
204         assert(data);
205         assert(source->state == STATE_DATA);
206
207         r = fill_fixed_size(source, data, source->data_size);
208         if (r <= 0)
209                 return r;
210
211         return 1;
212 }
213
214 static int get_data_newline(RemoteSource *source) {
215         int r;
216         char _cleanup_free_ *data = NULL;
217
218         assert(source);
219         assert(source->state == STATE_DATA_FINISH);
220
221         r = fill_fixed_size(source, (void**) &data, 1);
222         if (r <= 0)
223                 return r;
224
225         assert(data);
226         if (*data != '\n') {
227                 log_error("expected newline, got '%c'", *data);
228                 return -EINVAL;
229         }
230
231         return 1;
232 }
233
234 static int process_dunder(RemoteSource *source, char *line, size_t n) {
235         const char *timestamp;
236         int r;
237
238         assert(line);
239         assert(n > 0);
240         assert(line[n-1] == '\n');
241
242         /* XXX: is it worth to support timestamps in extended format?
243          * We don't produce them, but who knows... */
244
245         timestamp = startswith(line, "__CURSOR=");
246         if (timestamp)
247                 /* ignore __CURSOR */
248                 return 1;
249
250         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
251         if (timestamp) {
252                 long long unsigned x;
253                 line[n-1] = '\0';
254                 r = safe_atollu(timestamp, &x);
255                 if (r < 0)
256                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
257                 else
258                         source->ts.realtime = x;
259                 return r < 0 ? r : 1;
260         }
261
262         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
263         if (timestamp) {
264                 long long unsigned x;
265                 line[n-1] = '\0';
266                 r = safe_atollu(timestamp, &x);
267                 if (r < 0)
268                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
269                 else
270                         source->ts.monotonic = x;
271                 return r < 0 ? r : 1;
272         }
273
274         timestamp = startswith(line, "__");
275         if (timestamp) {
276                 log_notice("Unknown dunder line %s", line);
277                 return 1;
278         }
279
280         /* no dunder */
281         return 0;
282 }
283
284 int process_data(RemoteSource *source) {
285         int r;
286
287         switch(source->state) {
288         case STATE_LINE: {
289                 char *line, *sep;
290                 size_t n;
291
292                 assert(source->data_size == 0);
293
294                 r = get_line(source, &line, &n);
295                 if (r < 0)
296                         return r;
297                 if (r == 0) {
298                         source->state = STATE_EOF;
299                         return r;
300                 }
301                 assert(n > 0);
302                 assert(line[n-1] == '\n');
303
304                 if (n == 1) {
305                         log_debug("Received empty line, event is ready");
306                         free(line);
307                         return 1;
308                 }
309
310                 r = process_dunder(source, line, n);
311                 if (r != 0) {
312                         free(line);
313                         return r < 0 ? r : 0;
314                 }
315
316                 /* MESSAGE=xxx\n
317                    or
318                    COREDUMP\n
319                    LLLLLLLL0011223344...\n
320                 */
321                 sep = memchr(line, '=', n);
322                 if (sep)
323                         /* chomp newline */
324                         n--;
325                 else
326                         /* replace \n with = */
327                         line[n-1] = '=';
328                 log_debug("Received: %.*s", (int) n, line);
329
330                 r = iovw_put(&source->iovw, line, n);
331                 if (r < 0) {
332                         log_error("Failed to put line in iovect");
333                         free(line);
334                         return r;
335                 }
336
337                 if (!sep)
338                         source->state = STATE_DATA_START;
339                 return 0; /* continue */
340         }
341
342         case STATE_DATA_START:
343                 assert(source->data_size == 0);
344
345                 r = get_data_size(source);
346                 log_debug("get_data_size() -> %d", r);
347                 if (r < 0)
348                         return r;
349                 if (r == 0) {
350                         source->state = STATE_EOF;
351                         return 0;
352                 }
353
354                 source->state = source->data_size > 0 ?
355                         STATE_DATA : STATE_DATA_FINISH;
356
357                 return 0; /* continue */
358
359         case STATE_DATA: {
360                 void *data;
361
362                 assert(source->data_size > 0);
363
364                 r = get_data_data(source, &data);
365                 log_debug("get_data_data() -> %d", r);
366                 if (r < 0)
367                         return r;
368                 if (r == 0) {
369                         source->state = STATE_EOF;
370                         return 0;
371                 }
372
373                 assert(data);
374
375                 r = iovw_put(&source->iovw, data, source->data_size);
376                 if (r < 0) {
377                         log_error("failed to put binary buffer in iovect");
378                         return r;
379                 }
380
381                 source->state = STATE_DATA_FINISH;
382
383                 return 0; /* continue */
384         }
385
386         case STATE_DATA_FINISH:
387                 r = get_data_newline(source);
388                 log_debug("get_data_newline() -> %d", r);
389                 if (r < 0)
390                         return r;
391                 if (r == 0) {
392                         source->state = STATE_EOF;
393                         return 0;
394                 }
395
396                 source->data_size = 0;
397                 source->state = STATE_LINE;
398
399                 return 0; /* continue */
400         default:
401                 assert_not_reached("wtf?");
402         }
403 }
404
405 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
406         int r;
407
408         assert(source);
409         assert(writer);
410
411         r = process_data(source);
412         if (r <= 0)
413                 return r;
414
415         /* We have a full event */
416         log_info("Received a full event from source@%p fd:%d (%s)",
417                  source, source->fd, source->name);
418
419         if (!source->iovw.count) {
420                 log_warning("Entry with no payload, skipping");
421                 goto freeing;
422         }
423
424         assert(source->iovw.iovec);
425         assert(source->iovw.count);
426
427         r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
428         if (r < 0)
429                 log_error("Failed to write entry of %zu bytes: %s",
430                           iovw_size(&source->iovw), strerror(-r));
431         else
432                 r = 1;
433
434  freeing:
435         iovw_free_contents(&source->iovw);
436         return r;
437 }