chiark / gitweb /
journal-remote: tool to receive messages over the network
[elogind.git] / src / journal / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 1024u
26
27 void source_free(RemoteSource *source) {
28         if (!source)
29                 return;
30
31         if (source->fd >= 0) {
32                 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33                 close(source->fd);
34         }
35         free(source->name);
36         free(source->buf);
37         iovw_free_contents(&source->iovw);
38         free(source);
39 }
40
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
42         ssize_t n, remain;
43         char *c;
44         char *newbuf = NULL;
45         size_t newsize = 0;
46
47         assert(source);
48         assert(source->state == STATE_LINE);
49         assert(source->filled <= source->size);
50         assert(source->buf == NULL || source->size > 0);
51
52         c = memchr(source->buf, '\n', source->filled);
53         if (c != NULL)
54                 goto docopy;
55
56  resize:
57         if (source->size - source->filled < LINE_CHUNK) {
58                 // XXX: add check for maximum line length
59
60                 if (!GREEDY_REALLOC(source->buf, source->size,
61                                     source->filled + LINE_CHUNK))
62                         return log_oom();
63         }
64         assert(source->size - source->filled >= LINE_CHUNK);
65
66         n = read(source->fd, source->buf + source->filled,
67                  source->size - source->filled);
68         if (n < 0) {
69                 if (errno != EAGAIN && errno != EWOULDBLOCK)
70                         log_error("read(%d, ..., %zd): %m", source->fd,
71                                   source->size - source->filled);
72                 return -errno;
73         } else if (n == 0)
74                 return 0;
75
76         c = memchr(source->buf + source->filled, '\n', n);
77         source->filled += n;
78
79         if (c == NULL)
80                 goto resize;
81
82  docopy:
83         *line = source->buf;
84         *size = c + 1 - source->buf;
85
86         /* Check if something remains */
87         remain = source->buf + source->filled - c - 1;
88         assert(remain >= 0);
89         if (remain) {
90                 newsize = MAX(remain, LINE_CHUNK);
91                 newbuf = malloc(newsize);
92                 if (!newbuf)
93                         return log_oom();
94                 memcpy(newbuf, c + 1, remain);
95         }
96         source->buf = newbuf;
97         source->size = newsize;
98         source->filled = remain;
99
100         return 1;
101 }
102
103 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
104         int n;
105         char *newbuf = NULL;
106         size_t newsize = 0, remain;
107
108         assert(source);
109         assert(source->state == STATE_DATA_START ||
110                source->state == STATE_DATA ||
111                source->state == STATE_DATA_FINISH);
112         assert(size <= DATA_SIZE_MAX);
113         assert(source->filled <= source->size);
114         assert(source->buf != NULL || source->size == 0);
115         assert(source->buf == NULL || source->size > 0);
116         assert(data);
117
118         while(source->filled < size) {
119                 if (!GREEDY_REALLOC(source->buf, source->size, size))
120                         return log_oom();
121
122                 n = read(source->fd, source->buf + source->filled,
123                          source->size - source->filled);
124                 if (n < 0) {
125                         if (errno != EAGAIN && errno != EWOULDBLOCK)
126                                 log_error("read(%d, ..., %zd): %m", source->fd,
127                                           source->size - source->filled);
128                         return -errno;
129                 } else if (n == 0)
130                         return 0;
131
132                 source->filled += n;
133         }
134
135         *data = source->buf;
136
137         /* Check if something remains */
138         assert(size <= source->filled);
139         remain = source->filled - size;
140         if (remain) {
141                 newsize = MAX(remain, LINE_CHUNK);
142                 newbuf = malloc(newsize);
143                 if (!newbuf)
144                         return log_oom();
145                 memcpy(newbuf, source->buf + size, remain);
146         }
147         source->buf = newbuf;
148         source->size = newsize;
149         source->filled = remain;
150
151         return 1;
152 }
153
154 static int get_data_size(RemoteSource *source) {
155         int r;
156         void _cleanup_free_ *data = NULL;
157
158         assert(source);
159         assert(source->state == STATE_DATA_START);
160         assert(source->data_size == 0);
161
162         r = fill_fixed_size(source, &data, sizeof(uint64_t));
163         if (r <= 0)
164                 return r;
165
166         source->data_size = le64toh( *(uint64_t *) data );
167         if (source->data_size > DATA_SIZE_MAX) {
168                 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
169                           source->data_size, DATA_SIZE_MAX);
170                 return -EINVAL;
171         }
172         if (source->data_size == 0)
173                 log_warning("Binary field with zero length");
174
175         return 1;
176 }
177
178 static int get_data_data(RemoteSource *source, void **data) {
179         int r;
180
181         assert(source);
182         assert(data);
183         assert(source->state == STATE_DATA);
184
185         r = fill_fixed_size(source, data, source->data_size);
186         if (r <= 0)
187                 return r;
188
189         return 1;
190 }
191
192 static int get_data_newline(RemoteSource *source) {
193         int r;
194         char _cleanup_free_ *data = NULL;
195
196         assert(source);
197         assert(source->state == STATE_DATA_FINISH);
198
199         r = fill_fixed_size(source, (void**) &data, 1);
200         if (r <= 0)
201                 return r;
202
203         assert(data);
204         if (*data != '\n') {
205                 log_error("expected newline, got '%c'", *data);
206                 return -EINVAL;
207         }
208
209         return 1;
210 }
211
212 static int process_dunder(RemoteSource *source, char *line, size_t n) {
213         const char *timestamp;
214         int r;
215
216         assert(line);
217         assert(n > 0);
218         assert(line[n-1] == '\n');
219
220         /* XXX: is it worth to support timestamps in extended format?
221          * We don't produce them, but who knows... */
222
223         timestamp = startswith(line, "__CURSOR=");
224         if (timestamp)
225                 /* ignore __CURSOR */
226                 return 1;
227
228         timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
229         if (timestamp) {
230                 long long unsigned x;
231                 line[n-1] = '\0';
232                 r = safe_atollu(timestamp, &x);
233                 if (r < 0)
234                         log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
235                 else
236                         source->ts.realtime = x;
237                 return r < 0 ? r : 1;
238         }
239
240         timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
241         if (timestamp) {
242                 long long unsigned x;
243                 line[n-1] = '\0';
244                 r = safe_atollu(timestamp, &x);
245                 if (r < 0)
246                         log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
247                 else
248                         source->ts.monotonic = x;
249                 return r < 0 ? r : 1;
250         }
251
252         timestamp = startswith(line, "__");
253         if (timestamp) {
254                 log_notice("Unknown dunder line %s", line);
255                 return 1;
256         }
257
258         /* no dunder */
259         return 0;
260 }
261
262 int process_data(RemoteSource *source) {
263         int r;
264
265         switch(source->state) {
266         case STATE_LINE: {
267                 char *line, *sep;
268                 size_t n;
269
270                 assert(source->data_size == 0);
271
272                 r = get_line(source, &line, &n);
273                 if (r < 0)
274                         return r;
275                 if (r == 0) {
276                         source->state = STATE_EOF;
277                         return r;
278                 }
279                 assert(n > 0);
280                 assert(line[n-1] == '\n');
281
282                 if (n == 1) {
283                         log_debug("Received empty line, event is ready");
284                         free(line);
285                         return 1;
286                 }
287
288                 r = process_dunder(source, line, n);
289                 if (r != 0) {
290                         free(line);
291                         return r < 0 ? r : 0;
292                 }
293
294                 /* MESSAGE=xxx\n
295                    or
296                    COREDUMP\n
297                    LLLLLLLL0011223344...\n
298                 */
299                 sep = memchr(line, '=', n);
300                 if (sep)
301                         /* chomp newline */
302                         n--;
303                 else
304                         /* replace \n with = */
305                         line[n-1] = '=';
306                 log_debug("Received: %.*s", (int) n, line);
307
308                 r = iovw_put(&source->iovw, line, n);
309                 if (r < 0) {
310                         log_error("Failed to put line in iovect");
311                         free(line);
312                         return r;
313                 }
314
315                 if (!sep)
316                         source->state = STATE_DATA_START;
317                 return 0; /* continue */
318         }
319
320         case STATE_DATA_START:
321                 assert(source->data_size == 0);
322
323                 r = get_data_size(source);
324                 log_debug("get_data_size() -> %d", r);
325                 if (r < 0)
326                         return r;
327                 if (r == 0) {
328                         source->state = STATE_EOF;
329                         return 0;
330                 }
331
332                 source->state = source->data_size > 0 ?
333                         STATE_DATA : STATE_DATA_FINISH;
334
335                 return 0; /* continue */
336
337         case STATE_DATA: {
338                 void *data;
339
340                 assert(source->data_size > 0);
341
342                 r = get_data_data(source, &data);
343                 log_debug("get_data_data() -> %d", r);
344                 if (r < 0)
345                         return r;
346                 if (r == 0) {
347                         source->state = STATE_EOF;
348                         return 0;
349                 }
350
351                 assert(data);
352
353                 r = iovw_put(&source->iovw, data, source->data_size);
354                 if (r < 0) {
355                         log_error("failed to put binary buffer in iovect");
356                         return r;
357                 }
358
359                 source->state = STATE_DATA_FINISH;
360
361                 return 0; /* continue */
362         }
363
364         case STATE_DATA_FINISH:
365                 r = get_data_newline(source);
366                 log_debug("get_data_newline() -> %d", r);
367                 if (r < 0)
368                         return r;
369                 if (r == 0) {
370                         source->state = STATE_EOF;
371                         return 0;
372                 }
373
374                 source->data_size = 0;
375                 source->state = STATE_LINE;
376
377                 return 0; /* continue */
378         default:
379                 assert_not_reached("wtf?");
380         }
381 }
382
383 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
384         int r;
385
386         assert(source);
387         assert(writer);
388
389         r = process_data(source);
390         if (r <= 0)
391                 return r;
392
393         /* We have a full event */
394         log_info("Received a full event from source@%p fd:%d (%s)",
395                  source, source->fd, source->name);
396
397         if (!source->iovw.count) {
398                 log_warning("Entry with no payload, skipping");
399                 goto freeing;
400         }
401
402         assert(source->iovw.iovec);
403         assert(source->iovw.count);
404
405         r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
406         if (r < 0)
407                 log_error("Failed to write entry of %zu bytes: %s",
408                           iovw_size(&source->iovw), strerror(-r));
409         else
410                 r = 1;
411
412  freeing:
413         iovw_free_contents(&source->iovw);
414         return r;
415 }