1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2012 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
28 #include <sys/prctl.h>
29 #include <sys/socket.h>
31 #include <sys/types.h>
35 #include "sd-daemon.h"
37 #include "journal-file.h"
38 #include "journald-native.h"
39 #include "socket-util.h"
45 #include "socket-util.h"
46 #include "microhttpd-util.h"
49 #include <gnutls/gnutls.h>
52 #include "journal-remote-parse.h"
53 #include "journal-remote-write.h"
55 #define REMOTE_JOURNAL_PATH "/var/log/journal/" SD_ID128_FORMAT_STR "/remote-%s.journal"
57 static char* arg_output = NULL;
58 static char* arg_url = NULL;
59 static char* arg_getter = NULL;
60 static bool arg_stdin = false;
61 static char* arg_listen_raw = NULL;
62 static char* arg_listen_http = NULL;
63 static char* arg_listen_https = NULL;
64 static int arg_compress = true;
65 static int arg_seal = false;
67 static char *key_pem = NULL;
68 static char *cert_pem = NULL;
69 static char *trust_pem = NULL;
71 /**********************************************************************
72 **********************************************************************
73 **********************************************************************/
75 static int spawn_child(const char* child, char** argv) {
77 pid_t parent_pid, child_pid;
81 log_error("Failed to create pager pipe: %m");
85 parent_pid = getpid();
90 log_error("Failed to fork: %m");
97 r = dup2(fd[1], STDOUT_FILENO);
99 log_error("Failed to dup pipe to stdout: %m");
105 log_warning("Failed to close pipe fds: %m");
107 /* Make sure the child goes away when the parent dies */
108 if (prctl(PR_SET_PDEATHSIG, SIGTERM) < 0)
111 /* Check whether our parent died before we were able
112 * to set the death signal */
113 if (getppid() != parent_pid)
117 log_error("Failed to exec child %s: %m", child);
123 log_warning("Failed to close write end of pipe: %m");
128 static int spawn_curl(char* url) {
129 char **argv = STRV_MAKE("curl",
130 "-HAccept: application/vnd.fdo.journal",
136 r = spawn_child("curl", argv);
138 log_error("Failed to spawn curl: %m");
142 static int spawn_getter(char *getter, char *url) {
144 char _cleanup_strv_free_ **words = NULL, **words2 = NULL;
147 words = strv_split_quoted(getter);
151 r = spawn_child(words[0], words);
153 log_error("Failed to spawn getter %s: %m", getter);
158 static int open_output(Writer *s, const char* url) {
159 char _cleanup_free_ *name, *output = NULL;
168 for(c = name; *c; c++) {
169 if (*c == '/' || *c == ':' || *c == ' ')
171 else if (*c == '?') {
179 r = sd_id128_get_machine(&machine);
181 log_error("failed to determine machine ID128: %s", strerror(-r));
185 r = asprintf(&output, REMOTE_JOURNAL_PATH,
186 SD_ID128_FORMAT_VAL(machine), name);
190 r = is_dir(arg_output);
192 r = asprintf(&output,
193 "%s/remote-%s.journal", arg_output, name);
197 output = strdup(arg_output);
203 r = journal_file_open_reliably(output,
204 O_RDWR|O_CREAT, 0640,
205 arg_compress, arg_seal,
210 log_error("Failed to open output journal %s: %s",
211 arg_output, strerror(-r));
213 log_info("Opened output file %s", s->journal->path);
217 /**********************************************************************
218 **********************************************************************
219 **********************************************************************/
221 typedef struct MHDDaemonWrapper {
223 struct MHD_Daemon *daemon;
225 sd_event_source *event;
228 typedef struct RemoteServer {
229 RemoteSource **sources;
230 ssize_t sources_size;
234 sd_event_source *sigterm_event, *sigint_event, *listen_event;
241 /* This should go away as soon as µhttpd allows state to be passed around. */
242 static RemoteServer *server;
244 static int dispatch_raw_source_event(sd_event_source *event,
248 static int dispatch_raw_connection_event(sd_event_source *event,
252 static int dispatch_http_event(sd_event_source *event,
257 static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) {
261 if (!GREEDY_REALLOC0_T(s->sources, s->sources_size, fd + 1))
264 if (s->sources[fd] == NULL) {
265 s->sources[fd] = new0(RemoteSource, 1);
268 s->sources[fd]->fd = -1;
272 *source = s->sources[fd];
276 static int remove_source(RemoteServer *s, int fd) {
277 RemoteSource *source;
281 assert(fd < s->sources_size);
283 source = s->sources[fd];
286 s->sources[fd] = NULL;
295 static int add_source(RemoteServer *s, int fd, const char* name) {
296 RemoteSource *source = NULL;
304 realname = strdup(name);
308 r = asprintf(&realname, "fd:%d", fd);
313 log_debug("Creating source for fd:%d (%s)", fd, name);
315 r = get_source_for_fd(s, fd, &source);
317 log_error("Failed to create source for fd:%d (%s)", fd, name);
321 assert(source->fd < 0);
324 r = sd_event_add_io(s->events, &source->event,
325 fd, EPOLLIN, dispatch_raw_source_event, s);
327 log_error("Failed to register event source for fd:%d: %s",
332 return 1; /* work to do */
335 remove_source(s, fd);
339 static int setup_raw_socket(RemoteServer *s, const char *address) {
342 fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
346 r = sd_event_add_io(s->events, &s->listen_event, fd, EPOLLIN,
347 dispatch_raw_connection_event, s);
357 /**********************************************************************
358 **********************************************************************
359 **********************************************************************/
361 static RemoteSource *request_meta(void **connection_cls) {
362 RemoteSource *source;
364 assert(connection_cls);
366 return *connection_cls;
368 source = new0(RemoteSource, 1);
373 log_debug("Added RemoteSource as connection metadata %p", source);
375 *connection_cls = source;
379 static void request_meta_free(void *cls,
380 struct MHD_Connection *connection,
381 void **connection_cls,
382 enum MHD_RequestTerminationCode toe) {
385 assert(connection_cls);
388 log_debug("Cleaning up connection metadata %p", s);
390 *connection_cls = NULL;
393 static int process_http_upload(
394 struct MHD_Connection *connection,
395 const char *upload_data,
396 size_t *upload_data_size,
397 RemoteSource *source) {
399 bool finished = false;
404 log_debug("request_handler_upload: connection %p, %zu bytes",
405 connection, *upload_data_size);
407 if (*upload_data_size) {
408 log_info("Received %zu bytes", *upload_data_size);
410 r = push_data(source, upload_data, *upload_data_size);
412 log_error("Failed to store received data of size %zu: %s",
413 *upload_data_size, strerror(-r));
414 return respond_oom_internal(connection);
416 *upload_data_size = 0;
421 r = process_source(source, &server->writer, arg_compress, arg_seal);
423 log_warning("Entry too big, skipped");
424 else if (r == -EAGAIN || r == -EWOULDBLOCK)
427 log_warning("Failed to process data for connection %p", connection);
428 return respond_error(connection, MHD_HTTP_UNPROCESSABLE_ENTITY,
429 "Processing failed: %s", strerror(-r));
436 /* The upload is finished */
438 if (source_non_empty(source)) {
439 log_warning("EOF reached with incomplete data");
440 return respond_error(connection, MHD_HTTP_EXPECTATION_FAILED,
441 "Trailing data not processed.");
444 return respond_error(connection, MHD_HTTP_ACCEPTED, "OK.\n");
447 static int request_handler(
449 struct MHD_Connection *connection,
453 const char *upload_data,
454 size_t *upload_data_size,
455 void **connection_cls) {
461 assert(connection_cls);
465 log_debug("Handling a connection %s %s %s", method, url, version);
468 return process_http_upload(connection,
469 upload_data, upload_data_size,
472 if (!streq(method, "POST"))
473 return respond_error(connection, MHD_HTTP_METHOD_NOT_ACCEPTABLE,
474 "Unsupported method.\n");
476 if (!streq(url, "/upload"))
477 return respond_error(connection, MHD_HTTP_NOT_FOUND,
480 header = MHD_lookup_connection_value(connection,
481 MHD_HEADER_KIND, "Content-Type");
482 if (!header || !streq(header, "application/vnd.fdo.journal"))
483 return respond_error(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
484 "Content-Type: application/vnd.fdo.journal"
488 r = check_permissions(connection, &code);
493 if (!request_meta(connection_cls))
494 return respond_oom(connection);
498 static int setup_microhttpd_server(RemoteServer *s, int fd, bool https) {
499 struct MHD_OptionItem opts[] = {
500 { MHD_OPTION_NOTIFY_COMPLETED, (intptr_t) request_meta_free},
501 { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t) microhttpd_logger},
502 { MHD_OPTION_LISTEN_SOCKET, fd},
510 MHD_USE_PEDANTIC_CHECKS |
511 MHD_USE_EPOLL_LINUX_ONLY |
514 const union MHD_DaemonInfo *info;
520 r = fd_nonblock(fd, true);
522 log_error("Failed to make fd:%d nonblocking: %s", fd, strerror(-r));
527 opts[opts_pos++] = (struct MHD_OptionItem)
528 {MHD_OPTION_HTTPS_MEM_KEY, 0, key_pem};
529 opts[opts_pos++] = (struct MHD_OptionItem)
530 {MHD_OPTION_HTTPS_MEM_CERT, 0, cert_pem};
532 flags |= MHD_USE_SSL;
535 opts[opts_pos++] = (struct MHD_OptionItem)
536 {MHD_OPTION_HTTPS_MEM_TRUST, 0, trust_pem};
539 d = new(MHDDaemonWrapper, 1);
543 d->fd = (uint64_t) fd;
545 d->daemon = MHD_start_daemon(flags, 0,
547 request_handler, NULL,
548 MHD_OPTION_ARRAY, opts,
551 log_error("Failed to start µhttp daemon");
556 log_debug("Started MHD %s daemon on fd:%d (wrapper @ %p)",
557 https ? "HTTPS" : "HTTP", fd, d);
560 info = MHD_get_daemon_info(d->daemon, MHD_DAEMON_INFO_EPOLL_FD_LINUX_ONLY);
562 log_error("µhttp returned NULL daemon info");
567 epoll_fd = info->listen_fd;
569 log_error("µhttp epoll fd is invalid");
574 r = sd_event_add_io(s->events, &d->event,
575 epoll_fd, EPOLLIN, dispatch_http_event, d);
577 log_error("Failed to add event callback: %s", strerror(-r));
581 r = hashmap_ensure_allocated(&s->daemons, uint64_hash_func, uint64_compare_func);
587 r = hashmap_put(s->daemons, &d->fd, d);
589 log_error("Failed to add daemon to hashmap: %s", strerror(-r));
597 MHD_stop_daemon(d->daemon);
603 static int setup_microhttpd_socket(RemoteServer *s,
608 fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
612 return setup_microhttpd_server(s, fd, https);
615 static int dispatch_http_event(sd_event_source *event,
619 MHDDaemonWrapper *d = userdata;
624 log_info("%s", __func__);
626 r = MHD_run(d->daemon);
628 log_error("MHD_run failed!");
629 // XXX: unregister daemon
633 return 1; /* work to do */
636 /**********************************************************************
637 **********************************************************************
638 **********************************************************************/
640 static int dispatch_sigterm(sd_event_source *event,
641 const struct signalfd_siginfo *si,
643 RemoteServer *s = userdata;
647 log_received_signal(LOG_INFO, si);
649 sd_event_exit(s->events, 0);
653 static int setup_signals(RemoteServer *s) {
659 assert_se(sigemptyset(&mask) == 0);
660 sigset_add_many(&mask, SIGINT, SIGTERM, -1);
661 assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
663 r = sd_event_add_signal(s->events, &s->sigterm_event, SIGTERM, dispatch_sigterm, s);
667 r = sd_event_add_signal(s->events, &s->sigint_event, SIGINT, dispatch_sigterm, s);
674 static int remoteserver_init(RemoteServer *s) {
676 const char *output_name = NULL;
680 sd_event_default(&s->events);
684 assert(server == NULL);
687 n = sd_listen_fds(true);
689 log_error("Failed to read listening file descriptors from environment: %s",
693 log_info("Received %d descriptors", n);
695 for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
696 if (sd_is_socket(fd, AF_UNSPEC, 0, false)) {
697 assert_not_reached("not implemented");
698 } else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) {
699 log_info("Received a connection socket (fd:%d)", fd);
701 r = add_source(s, fd, NULL);
702 output_name = "socket";
704 log_error("Unknown socket passed on fd:%d", fd);
710 char _cleanup_free_ *url = NULL;
711 char _cleanup_strv_free_ **urlv = strv_new(arg_url, "/entries", NULL);
714 url = strv_join(urlv, "");
719 log_info("Spawning getter %s...", url);
720 fd = spawn_getter(arg_getter, url);
722 log_info("Spawning curl %s...", url);
723 fd = spawn_curl(url);
728 r = add_source(s, fd, arg_url);
732 output_name = arg_url;
735 if (arg_listen_raw) {
736 log_info("Listening on a socket...");
737 r = setup_raw_socket(s, arg_listen_raw);
741 output_name = arg_listen_raw;
744 if (arg_listen_http) {
745 r = setup_microhttpd_socket(s, arg_listen_http, false);
749 output_name = arg_listen_http;
752 if (arg_listen_https) {
753 r = setup_microhttpd_socket(s, arg_listen_https, true);
757 output_name = arg_listen_https;
761 log_info("Reading standard input...");
762 r = add_source(s, STDIN_FILENO, "stdin");
766 output_name = "stdin";
769 if (s->active == 0) {
770 log_error("Zarro sources specified");
774 if (!!n + !!arg_url + !!arg_listen_raw + !!arg_stdin > 1)
775 output_name = "multiple";
777 r = writer_init(&s->writer);
781 r = open_output(&s->writer, output_name);
785 static int server_destroy(RemoteServer *s) {
790 r = writer_close(&s->writer);
792 while ((d = hashmap_steal_first(s->daemons))) {
793 MHD_stop_daemon(d->daemon);
794 sd_event_source_unref(d->event);
798 hashmap_free(s->daemons);
800 assert(s->sources_size == 0 || s->sources);
801 for (i = 0; i < s->sources_size; i++)
806 sd_event_source_unref(s->sigterm_event);
807 sd_event_source_unref(s->sigint_event);
808 sd_event_source_unref(s->listen_event);
809 sd_event_unref(s->events);
811 /* fds that we're listening on remain open... */
816 /**********************************************************************
817 **********************************************************************
818 **********************************************************************/
820 static int dispatch_raw_source_event(sd_event_source *event,
825 RemoteServer *s = userdata;
826 RemoteSource *source;
829 assert(fd < s->sources_size);
830 source = s->sources[fd];
831 assert(source->fd == fd);
833 r = process_source(source, &s->writer, arg_compress, arg_seal);
834 if (source->state == STATE_EOF) {
835 log_info("EOF reached with source fd:%d (%s)",
836 source->fd, source->name);
837 if (source_non_empty(source))
838 log_warning("EOF reached with incomplete data");
839 remove_source(s, source->fd);
840 log_info("%zd active source remaining", s->active);
841 } else if (r == -E2BIG) {
842 log_error("Entry too big, skipped");
849 static int accept_connection(const char* type, int fd, SocketAddress *addr) {
852 log_debug("Accepting new %s connection on fd:%d", type, fd);
853 fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
855 log_error("accept() on fd:%d failed: %m", fd);
859 switch(socket_address_family(addr)) {
862 char* _cleanup_free_ a = NULL;
864 r = socket_address_print(addr, &a);
866 log_error("socket_address_print(): %s", strerror(-r));
871 log_info("Accepted %s %s connection from %s",
873 socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
879 log_error("Rejected %s connection with unsupported family %d",
880 type, socket_address_family(addr));
887 static int dispatch_raw_connection_event(sd_event_source *event,
891 RemoteServer *s = userdata;
893 SocketAddress addr = {
894 .size = sizeof(union sockaddr_union),
898 fd2 = accept_connection("raw", fd, &addr);
902 return add_source(s, fd2, NULL);
905 /**********************************************************************
906 **********************************************************************
907 **********************************************************************/
909 static int help(void) {
910 printf("%s [OPTIONS...]\n\n"
911 "Write external journal events to a journal file.\n\n"
913 " --url=URL Read events from systemd-journal-gatewayd at URL\n"
914 " --getter=COMMAND Read events from the output of COMMAND\n"
915 " --listen-raw=ADDR Listen for connections at ADDR\n"
916 " --listen-http=ADDR Listen for HTTP connections at ADDR\n"
917 " --listen-https=ADDR Listen for HTTPS connections at ADDR\n"
918 " --stdin Read events from standard input\n"
919 " -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n"
920 " --[no-]compress Use XZ-compression in the output journal (default: yes)\n"
921 " --[no-]seal Use Event sealing in the output journal (default: no)\n"
922 " -h --help Show this help and exit\n"
923 " --version Print version string and exit\n"
925 "Note: file descriptors from sd_listen_fds() will be consumed, too.\n"
926 , program_invocation_short_name);
931 static int parse_argv(int argc, char *argv[]) {
949 static const struct option options[] = {
950 { "help", no_argument, NULL, 'h' },
951 { "version", no_argument, NULL, ARG_VERSION },
952 { "url", required_argument, NULL, ARG_URL },
953 { "getter", required_argument, NULL, ARG_GETTER },
954 { "listen-raw", required_argument, NULL, ARG_LISTEN_RAW },
955 { "listen-http", required_argument, NULL, ARG_LISTEN_HTTP },
956 { "listen-https", required_argument, NULL, ARG_LISTEN_HTTPS },
957 { "stdin", no_argument, NULL, ARG_STDIN },
958 { "output", required_argument, NULL, 'o' },
959 { "compress", no_argument, NULL, ARG_COMPRESS },
960 { "no-compress", no_argument, NULL, ARG_NO_COMPRESS },
961 { "seal", no_argument, NULL, ARG_SEAL },
962 { "no-seal", no_argument, NULL, ARG_NO_SEAL },
963 { "key", required_argument, NULL, ARG_KEY },
964 { "cert", required_argument, NULL, ARG_CERT },
965 { "trust", required_argument, NULL, ARG_TRUST },
974 while ((c = getopt_long(argc, argv, "ho:", options, NULL)) >= 0)
981 puts(PACKAGE_STRING);
982 puts(SYSTEMD_FEATURES);
987 log_error("cannot currently set more than one --url");
996 log_error("cannot currently use --getter more than once");
1000 arg_getter = optarg;
1003 case ARG_LISTEN_RAW:
1004 if (arg_listen_raw) {
1005 log_error("cannot currently use --listen-raw more than once");
1009 arg_listen_raw = optarg;
1012 case ARG_LISTEN_HTTP:
1013 if (arg_listen_http) {
1014 log_error("cannot currently use --listen-http more than once");
1018 arg_listen_http = optarg;
1021 case ARG_LISTEN_HTTPS:
1022 if (arg_listen_https) {
1023 log_error("cannot currently use --listen-https more than once");
1027 arg_listen_https = optarg;
1032 log_error("Key file specified twice");
1035 r = read_full_file(optarg, &key_pem, NULL);
1037 log_error("Failed to read key file: %s", strerror(-r));
1045 log_error("Certificate file specified twice");
1048 r = read_full_file(optarg, &cert_pem, NULL);
1050 log_error("Failed to read certificate file: %s", strerror(-r));
1059 log_error("CA certificate file specified twice");
1062 r = read_full_file(optarg, &trust_pem, NULL);
1064 log_error("Failed to read CA certificate file: %s", strerror(-r));
1070 log_error("Option --trust is not available.");
1079 log_error("cannot use --output/-o more than once");
1083 arg_output = optarg;
1087 arg_compress = true;
1089 case ARG_NO_COMPRESS:
1090 arg_compress = false;
1103 log_error("Unknown option code %c", c);
1107 if (arg_listen_https && !(key_pem && cert_pem)) {
1108 log_error("Options --key and --cert must be used when https sources are specified");
1112 if (optind < argc) {
1113 log_error("This program takes no positional arguments");
1117 return 1 /* work to do */;
1120 static int setup_gnutls_logger(void) {
1121 if (!arg_listen_http && !arg_listen_https)
1125 gnutls_global_set_log_function(log_func_gnutls);
1126 gnutls_global_set_log_level(GNUTLS_LOG_LEVEL);
1132 int main(int argc, char **argv) {
1133 RemoteServer s = {};
1136 log_set_max_level(LOG_DEBUG);
1137 log_show_color(true);
1138 log_parse_environment();
1140 r = parse_argv(argc, argv);
1142 return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
1144 r = setup_gnutls_logger();
1146 return EXIT_FAILURE;
1148 if (remoteserver_init(&s) < 0)
1149 return EXIT_FAILURE;
1151 log_debug("%s running as pid %lu",
1152 program_invocation_short_name, (unsigned long) getpid());
1155 "STATUS=Processing requests...");
1158 r = sd_event_get_state(s.events);
1161 if (r == SD_EVENT_FINISHED)
1164 r = sd_event_run(s.events, -1);
1166 log_error("Failed to run event loop: %s", strerror(-r));
1171 log_info("Finishing after writing %" PRIu64 " entries", s.writer.seqnum);
1172 r2 = server_destroy(&s);
1174 sd_notify(false, "STATUS=Shutting down...");
1176 return r >= 0 && r2 >= 0 ? EXIT_SUCCESS : EXIT_FAILURE;