chiark / gitweb /
f8979daca67e0c5e0de5cc684dd6b0b2e8dd2f44
[elogind.git] / src / journal / journal-remote.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2012 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/prctl.h>
29 #include <sys/socket.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <unistd.h>
33 #include <getopt.h>
34
35 #include "sd-daemon.h"
36 #include "sd-event.h"
37 #include "journal-file.h"
38 #include "journald-native.h"
39 #include "socket-util.h"
40 #include "mkdir.h"
41 #include "build.h"
42 #include "macro.h"
43 #include "strv.h"
44
45 #include "journal-remote-parse.h"
46 #include "journal-remote-write.h"
47
48 #define REMOTE_JOURNAL_PATH "/var/log/journal/" SD_ID128_FORMAT_STR "/remote-%s.journal"
49
50 static char* arg_output = NULL;
51 static char* arg_url = NULL;
52 static char* arg_getter = NULL;
53 static bool arg_stdin = false;
54 static char* arg_listen_raw = NULL;
55 static int arg_compress = true;
56 static int arg_seal = false;
57
58 /**********************************************************************
59  **********************************************************************
60  **********************************************************************/
61
62 static int spawn_child(const char* child, char** argv) {
63         int fd[2];
64         pid_t parent_pid, child_pid;
65         int r;
66
67         if (pipe(fd) < 0) {
68                 log_error("Failed to create pager pipe: %m");
69                 return -errno;
70         }
71
72         parent_pid = getpid();
73
74         child_pid = fork();
75         if (child_pid < 0) {
76                 r = -errno;
77                 log_error("Failed to fork: %m");
78                 close_pipe(fd);
79                 return r;
80         }
81
82         /* In the child */
83         if (child_pid == 0) {
84                 r = dup2(fd[1], STDOUT_FILENO);
85                 if (r < 0) {
86                         log_error("Failed to dup pipe to stdout: %m");
87                         _exit(EXIT_FAILURE);
88                 }
89
90                 r = close_pipe(fd);
91                 if (r < 0)
92                         log_warning("Failed to close pipe fds: %m");
93
94                 /* Make sure the child goes away when the parent dies */
95                 if (prctl(PR_SET_PDEATHSIG, SIGTERM) < 0)
96                         _exit(EXIT_FAILURE);
97
98                 /* Check whether our parent died before we were able
99                  * to set the death signal */
100                 if (getppid() != parent_pid)
101                         _exit(EXIT_SUCCESS);
102
103                 execvp(child, argv);
104                 log_error("Failed to exec child %s: %m", child);
105                 _exit(EXIT_FAILURE);
106         }
107
108         r = close(fd[1]);
109         if (r < 0)
110                 log_warning("Failed to close write end of pipe: %m");
111
112         return fd[0];
113 }
114
115 static int spawn_curl(char* url) {
116         char **argv = STRV_MAKE("curl",
117                                 "-HAccept: application/vnd.fdo.journal",
118                                 "--silent",
119                                 "--show-error",
120                                 url);
121         int r;
122
123         r = spawn_child("curl", argv);
124         if (r < 0)
125                 log_error("Failed to spawn curl: %m");
126         return r;
127 }
128
129 static int spawn_getter(char *getter, char *url) {
130         int r;
131         char _cleanup_strv_free_ **words = NULL, **words2 = NULL;
132
133         assert(getter);
134         words = strv_split_quoted(getter);
135         if (!words)
136                 return log_oom();
137
138         r = spawn_child(words[0], words);
139         if (r < 0)
140                 log_error("Failed to spawn getter %s: %m", getter);
141
142         return r;
143 }
144
145 static int open_output(Writer *s, const char* url) {
146         char _cleanup_free_ *name, *output = NULL;
147         char *c;
148         int r;
149
150         assert(url);
151         name = strdup(url);
152         if (!name)
153                 return log_oom();
154
155         for(c = name; *c; c++) {
156                 if (*c == '/' || *c == ':' || *c == ' ')
157                         *c = '~';
158                 else if (*c == '?') {
159                         *c = '\0';
160                         break;
161                 }
162         }
163
164         if (!arg_output) {
165                 sd_id128_t machine;
166                 r = sd_id128_get_machine(&machine);
167                 if (r < 0) {
168                         log_error("failed to determine machine ID128: %s", strerror(-r));
169                         return r;
170                 }
171
172                 r = asprintf(&output, REMOTE_JOURNAL_PATH,
173                              SD_ID128_FORMAT_VAL(machine), name);
174                 if (r < 0)
175                         return log_oom();
176         } else {
177                 r = is_dir(arg_output);
178                 if (r > 0) {
179                         r = asprintf(&output,
180                                      "%s/remote-%s.journal", arg_output, name);
181                         if (r < 0)
182                                 return log_oom();
183                 } else {
184                         output = strdup(arg_output);
185                         if (!output)
186                                 return log_oom();
187                 }
188         }
189
190         r = journal_file_open_reliably(output,
191                                        O_RDWR|O_CREAT, 0640,
192                                        arg_compress, arg_seal,
193                                        &s->metrics,
194                                        s->mmap,
195                                        NULL, &s->journal);
196         if (r < 0)
197                 log_error("Failed to open output journal %s: %s",
198                           arg_output, strerror(-r));
199         else
200                 log_info("Opened output file %s", s->journal->path);
201         return r;
202 }
203
204 typedef struct RemoteServer {
205         RemoteSource **sources;
206         ssize_t sources_size;
207         ssize_t active;
208
209         sd_event *events;
210         sd_event_source *sigterm_event, *sigint_event, *listen_event;
211
212         Writer writer;
213 } RemoteServer;
214
215 static int dispatch_raw_source_event(sd_event_source *event,
216                                      int fd,
217                                      uint32_t revents,
218                                      void *userdata);
219 static int dispatch_raw_connection_event(sd_event_source *event,
220                                          int fd,
221                                          uint32_t revents,
222                                          void *userdata);
223
224 static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) {
225         assert(fd >= 0);
226         assert(source);
227
228         if (!GREEDY_REALLOC0_T(s->sources, s->sources_size, fd + 1))
229                 return log_oom();
230
231         if (s->sources[fd] == NULL) {
232                 s->sources[fd] = new0(RemoteSource, 1);
233                 if (!s->sources[fd])
234                         return log_oom();
235                 s->sources[fd]->fd = -1;
236                 s->active++;
237         }
238
239         *source = s->sources[fd];
240         return 0;
241 }
242
243 static int remove_source(RemoteServer *s, int fd) {
244         RemoteSource *source;
245
246         assert(s);
247         assert(fd >= 0);
248         assert(fd < s->sources_size);
249
250         source = s->sources[fd];
251         if (source) {
252                 source_free(source);
253                 s->sources[fd] = NULL;
254                 s->active--;
255         }
256
257         close(fd);
258
259         return 0;
260 }
261
262 static int add_source(RemoteServer *s, int fd, const char* name) {
263         RemoteSource *source = NULL;
264         char *realname;
265         int r;
266
267         assert(s);
268         assert(fd >= 0);
269
270         if (name) {
271                 realname = strdup(name);
272                 if (!realname)
273                         return log_oom();
274         } else {
275                 r = asprintf(&realname, "fd:%d", fd);
276                 if (r < 0)
277                         return log_oom();
278         }
279
280         log_debug("Creating source for fd:%d (%s)", fd, name);
281
282         r = get_source_for_fd(s, fd, &source);
283         if (r < 0) {
284                 log_error("Failed to create source for fd:%d (%s)", fd, name);
285                 return r;
286         }
287         assert(source);
288         assert(source->fd < 0);
289         source->fd = fd;
290
291         r = sd_event_add_io(s->events, &source->event,
292                             fd, EPOLLIN, dispatch_raw_source_event, s);
293         if (r < 0) {
294                 log_error("Failed to register event source for fd:%d: %s",
295                           fd, strerror(-r));
296                 goto error;
297         }
298
299         return 1; /* work to do */
300
301  error:
302         remove_source(s, fd);
303         return r;
304 }
305
306 static int setup_raw_socket(RemoteServer *s, const char *address) {
307         int fd, r;
308
309         fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
310         if (fd < 0)
311                 return fd;
312
313         r = sd_event_add_io(s->events, &s->listen_event, fd, EPOLLIN,
314                             dispatch_raw_connection_event, s);
315         if (r < 0) {
316                 close(fd);
317                 return r;
318         }
319
320         s->active ++;
321         return 0;
322 }
323
324 /**********************************************************************
325  **********************************************************************
326  **********************************************************************/
327
328 static int dispatch_sigterm(sd_event_source *event,
329                             const struct signalfd_siginfo *si,
330                             void *userdata) {
331         RemoteServer *s = userdata;
332
333         assert(s);
334
335         log_received_signal(LOG_INFO, si);
336
337         sd_event_exit(s->events, 0);
338         return 0;
339 }
340
341 static int setup_signals(RemoteServer *s) {
342         sigset_t mask;
343         int r;
344
345         assert(s);
346
347         assert_se(sigemptyset(&mask) == 0);
348         sigset_add_many(&mask, SIGINT, SIGTERM, -1);
349         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
350
351         r = sd_event_add_signal(s->events, &s->sigterm_event, SIGTERM, dispatch_sigterm, s);
352         if (r < 0)
353                 return r;
354
355         r = sd_event_add_signal(s->events, &s->sigint_event, SIGINT, dispatch_sigterm, s);
356         if (r < 0)
357                 return r;
358
359         return 0;
360 }
361
362 static int remoteserver_init(RemoteServer *s) {
363         int r, n, fd;
364         const char *output_name = NULL;
365
366         assert(s);
367
368         sd_event_default(&s->events);
369
370         setup_signals(s);
371
372         n = sd_listen_fds(true);
373         if (n < 0) {
374                 log_error("Failed to read listening file descriptors from environment: %s",
375                           strerror(-n));
376                 return n;
377         } else
378                 log_info("Received %d descriptors", n);
379
380         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
381                 if (sd_is_socket(fd, AF_UNSPEC, 0, false)) {
382                         assert_not_reached("not implemented");
383                 } else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) {
384                         log_info("Received a connection socket (fd:%d)", fd);
385
386                         r = add_source(s, fd, NULL);
387                         output_name = "socket";
388                 } else {
389                         log_error("Unknown socket passed on fd:%d", fd);
390                         return -EINVAL;
391                 }
392         }
393
394         if (arg_url) {
395                 char _cleanup_free_ *url = NULL;
396                 char _cleanup_strv_free_ **urlv = strv_new(arg_url, "/entries", NULL);
397                 if (!urlv)
398                         return log_oom();
399                 url = strv_join(urlv, "");
400                 if (!url)
401                         return log_oom();
402
403                 if (arg_getter) {
404                         log_info("Spawning getter %s...", url);
405                         fd = spawn_getter(arg_getter, url);
406                 } else {
407                         log_info("Spawning curl %s...", url);
408                         fd = spawn_curl(url);
409                 }
410                 if (fd < 0)
411                         return fd;
412
413                 r = add_source(s, fd, arg_url);
414                 if (r < 0)
415                         return r;
416
417                 output_name = arg_url;
418         }
419
420         if (arg_listen_raw) {
421                 log_info("Listening on a socket...");
422                 r = setup_raw_socket(s, arg_listen_raw);
423                 if (r < 0)
424                         return r;
425
426                 output_name = arg_listen_raw;
427         }
428
429         if (arg_stdin) {
430                 log_info("Reading standard input...");
431                 r = add_source(s, STDIN_FILENO, "stdin");
432                 if (r < 0)
433                         return r;
434
435                 output_name = "stdin";
436         }
437
438         if (s->active == 0) {
439                 log_error("Zarro sources specified");
440                 return -EINVAL;
441         }
442
443         if (!!n + !!arg_url + !!arg_listen_raw + !!arg_stdin > 1)
444                 output_name = "multiple";
445
446         r = writer_init(&s->writer);
447         if (r < 0)
448                 return r;
449
450         r = open_output(&s->writer, output_name);
451         return r;
452 }
453
454 static int server_destroy(RemoteServer *s) {
455         int r;
456         ssize_t i;
457
458         r = writer_close(&s->writer);
459
460         assert(s->sources_size == 0 || s->sources);
461         for(i = 0; i < s->sources_size; i++)
462                 remove_source(s, i);
463
464         free(s->sources);
465
466         sd_event_source_unref(s->sigterm_event);
467         sd_event_source_unref(s->sigint_event);
468         sd_event_source_unref(s->listen_event);
469         sd_event_unref(s->events);
470
471         /* fds that we're listening on remain open... */
472
473         return r;
474 }
475
476 /**********************************************************************
477  **********************************************************************
478  **********************************************************************/
479
480 static int dispatch_raw_source_event(sd_event_source *event,
481                                      int fd,
482                                      uint32_t revents,
483                                      void *userdata) {
484
485         RemoteServer *s = userdata;
486         RemoteSource *source;
487         int r;
488
489         assert(fd < s->sources_size);
490         source = s->sources[fd];
491         assert(source->fd == fd);
492
493         r = process_source(source, &s->writer, arg_compress, arg_seal);
494         if (source->state == STATE_EOF) {
495                 log_info("EOF reached with source fd:%d (%s)",
496                          source->fd, source->name);
497                 if (source_non_empty(source))
498                         log_warning("EOF reached with incomplete data");
499                 remove_source(s, source->fd);
500                 log_info("%zd active source remaining", s->active);
501         } else if (r == -E2BIG) {
502                 log_error("Entry too big, skipped");
503                 r = 1;
504         }
505
506         return r;
507 }
508
509 static int dispatch_raw_connection_event(sd_event_source *event,
510                                          int fd,
511                                          uint32_t revents,
512                                          void *userdata) {
513         RemoteServer *s = userdata;
514
515         SocketAddress addr = {
516                 .size = sizeof(union sockaddr_union),
517                 .type = SOCK_STREAM,
518         };
519         int fd2, r;
520
521         log_debug("Accepting new connection on fd:%d", fd);
522         fd2 = accept4(fd, &addr.sockaddr.sa, &addr.size, SOCK_NONBLOCK|SOCK_CLOEXEC);
523         if (fd2 < 0) {
524                 log_error("accept() on fd:%d failed: %m", fd);
525                 return -errno;
526         }
527
528         switch(socket_address_family(&addr)) {
529         case AF_INET:
530         case AF_INET6: {
531                 char* _cleanup_free_ a = NULL;
532
533                 r = socket_address_print(&addr, &a);
534                 if (r < 0) {
535                         log_error("socket_address_print(): %s", strerror(-r));
536                         close(fd2);
537                         return r;
538                 }
539
540                 log_info("Accepted %s connection from %s",
541                          socket_address_family(&addr) == AF_INET ? "IP" : "IPv6",
542                          a);
543                 break;
544         };
545         default:
546                 log_error("Connection with unsupported family %d",
547                           socket_address_family(&addr));
548                 close(fd2);
549                 return -EINVAL;
550         }
551
552         r = add_source(s, fd2, NULL);
553         if (r < 0)
554                 log_error("failed to create source from fd:%d: %s", fd2, strerror(-r));
555
556         return r;
557 }
558
559
560 /**********************************************************************
561  **********************************************************************
562  **********************************************************************/
563
564 static int help(void) {
565         printf("%s [OPTIONS...]\n\n"
566                "Write external journal events to a journal file.\n\n"
567                "Options:\n"
568                "  --url=URL            Read events from systemd-journal-gatewayd at URL\n"
569                "  --getter=COMMAND     Read events from the output of COMMAND\n"
570                "  --listen-raw=ADDR    Listen for connections at ADDR\n"
571                "  --stdin              Read events from standard input\n"
572                "  -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n"
573                "  --[no-]compress      Use XZ-compression in the output journal (default: yes)\n"
574                "  --[no-]seal          Use Event sealing in the output journal (default: no)\n"
575                "  -h --help            Show this help and exit\n"
576                "  --version            Print version string and exit\n"
577                "\n"
578                "Note: file descriptors from sd_listen_fds() will be consumed, too.\n"
579                , program_invocation_short_name);
580
581         return 0;
582 }
583
584 static int parse_argv(int argc, char *argv[]) {
585         enum {
586                 ARG_VERSION = 0x100,
587                 ARG_URL,
588                 ARG_LISTEN_RAW,
589                 ARG_STDIN,
590                 ARG_GETTER,
591                 ARG_COMPRESS,
592                 ARG_NO_COMPRESS,
593                 ARG_SEAL,
594                 ARG_NO_SEAL,
595         };
596
597         static const struct option options[] = {
598                 { "help",         no_argument,       NULL, 'h'              },
599                 { "version",      no_argument,       NULL, ARG_VERSION      },
600                 { "url",          required_argument, NULL, ARG_URL          },
601                 { "getter",       required_argument, NULL, ARG_GETTER       },
602                 { "listen-raw",   required_argument, NULL, ARG_LISTEN_RAW   },
603                 { "stdin",        no_argument,       NULL, ARG_STDIN        },
604                 { "output",       required_argument, NULL, 'o'              },
605                 { "compress",     no_argument,       NULL, ARG_COMPRESS     },
606                 { "no-compress",  no_argument,       NULL, ARG_NO_COMPRESS  },
607                 { "seal",         no_argument,       NULL, ARG_SEAL         },
608                 { "no-seal",      no_argument,       NULL, ARG_NO_SEAL      },
609                 {}
610         };
611
612         int c;
613
614         assert(argc >= 0);
615         assert(argv);
616
617         while ((c = getopt_long(argc, argv, "ho:", options, NULL)) >= 0)
618                 switch(c) {
619                 case 'h':
620                         help();
621                         return 0 /* done */;
622
623                 case ARG_VERSION:
624                         puts(PACKAGE_STRING);
625                         puts(SYSTEMD_FEATURES);
626                         return 0 /* done */;
627
628                 case ARG_URL:
629                         if (arg_url) {
630                                 log_error("cannot currently set more than one --url");
631                                 return -EINVAL;
632                         }
633
634                         arg_url = optarg;
635                         break;
636
637                 case ARG_GETTER:
638                         if (arg_getter) {
639                                 log_error("cannot currently use --getter more than once");
640                                 return -EINVAL;
641                         }
642
643                         arg_getter = optarg;
644                         break;
645
646                 case ARG_LISTEN_RAW:
647                         if (arg_listen_raw) {
648                                 log_error("cannot currently use --listen-raw more than once");
649                                 return -EINVAL;
650                         }
651
652                         arg_listen_raw = optarg;
653                         break;
654
655                 case ARG_STDIN:
656                         arg_stdin = true;
657                         break;
658
659                 case 'o':
660                         if (arg_output) {
661                                 log_error("cannot use --output/-o more than once");
662                                 return -EINVAL;
663                         }
664
665                         arg_output = optarg;
666                         break;
667
668                 case ARG_COMPRESS:
669                         arg_compress = true;
670                         break;
671                 case ARG_NO_COMPRESS:
672                         arg_compress = false;
673                         break;
674                 case ARG_SEAL:
675                         arg_seal = true;
676                         break;
677                 case ARG_NO_SEAL:
678                         arg_seal = false;
679                         break;
680
681                 case '?':
682                         return -EINVAL;
683
684                 default:
685                         log_error("Unknown option code %c", c);
686                         return -EINVAL;
687                 }
688
689         if (optind < argc) {
690                 log_error("This program takes no positional arguments");
691                 return -EINVAL;
692         }
693
694         return 1 /* work to do */;
695 }
696
697 int main(int argc, char **argv) {
698         RemoteServer s = {};
699         int r, r2;
700
701         log_set_max_level(LOG_DEBUG);
702         log_show_color(true);
703         log_parse_environment();
704
705         r = parse_argv(argc, argv);
706         if (r <= 0)
707                 return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
708
709         if (remoteserver_init(&s) < 0)
710                 return EXIT_FAILURE;
711
712         log_debug("%s running as pid %lu",
713                   program_invocation_short_name, (unsigned long) getpid());
714         sd_notify(false,
715                   "READY=1\n"
716                   "STATUS=Processing requests...");
717
718         while (s.active) {
719                 r = sd_event_get_state(s.events);
720                 if (r < 0)
721                         break;
722                 if (r == SD_EVENT_FINISHED)
723                         break;
724
725                 r = sd_event_run(s.events, -1);
726                 if (r < 0) {
727                         log_error("Failed to run event loop: %s", strerror(-r));
728                         break;
729                 }
730         }
731
732         log_info("Finishing after writing %" PRIu64 " entries", s.writer.seqnum);
733         r2 = server_destroy(&s);
734
735         sd_notify(false, "STATUS=Shutting down...");
736
737         return r >= 0 && r2 >= 0 ? EXIT_SUCCESS : EXIT_FAILURE;
738 }