1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2012 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
28 #include <sys/prctl.h>
29 #include <sys/socket.h>
31 #include <sys/types.h>
35 #include "sd-daemon.h"
37 #include "journal-file.h"
38 #include "journald-native.h"
39 #include "socket-util.h"
45 #include "socket-util.h"
46 #include "microhttpd-util.h"
49 #include <gnutls/gnutls.h>
52 #include "journal-remote-parse.h"
53 #include "journal-remote-write.h"
55 #define REMOTE_JOURNAL_PATH "/var/log/journal/" SD_ID128_FORMAT_STR "/remote-%s.journal"
57 static char* arg_output = NULL;
58 static char* arg_url = NULL;
59 static char* arg_getter = NULL;
60 static char* arg_listen_raw = NULL;
61 static char* arg_listen_http = NULL;
62 static char* arg_listen_https = NULL;
63 static char** arg_files = NULL;
64 static int arg_compress = true;
65 static int arg_seal = false;
66 static int http_socket = -1, https_socket = -1;
68 static char *key_pem = NULL;
69 static char *cert_pem = NULL;
70 static char *trust_pem = NULL;
72 /**********************************************************************
73 **********************************************************************
74 **********************************************************************/
76 static int spawn_child(const char* child, char** argv) {
78 pid_t parent_pid, child_pid;
82 log_error("Failed to create pager pipe: %m");
86 parent_pid = getpid();
91 log_error("Failed to fork: %m");
98 r = dup2(fd[1], STDOUT_FILENO);
100 log_error("Failed to dup pipe to stdout: %m");
106 log_warning("Failed to close pipe fds: %m");
108 /* Make sure the child goes away when the parent dies */
109 if (prctl(PR_SET_PDEATHSIG, SIGTERM) < 0)
112 /* Check whether our parent died before we were able
113 * to set the death signal */
114 if (getppid() != parent_pid)
118 log_error("Failed to exec child %s: %m", child);
124 log_warning("Failed to close write end of pipe: %m");
129 static int spawn_curl(char* url) {
130 char **argv = STRV_MAKE("curl",
131 "-HAccept: application/vnd.fdo.journal",
137 r = spawn_child("curl", argv);
139 log_error("Failed to spawn curl: %m");
143 static int spawn_getter(char *getter, char *url) {
145 char _cleanup_strv_free_ **words = NULL;
148 words = strv_split_quoted(getter);
152 r = spawn_child(words[0], words);
154 log_error("Failed to spawn getter %s: %m", getter);
159 static int open_output(Writer *s, const char* url) {
160 char _cleanup_free_ *name, *output = NULL;
169 for(c = name; *c; c++) {
170 if (*c == '/' || *c == ':' || *c == ' ')
172 else if (*c == '?') {
180 r = sd_id128_get_machine(&machine);
182 log_error("failed to determine machine ID128: %s", strerror(-r));
186 r = asprintf(&output, REMOTE_JOURNAL_PATH,
187 SD_ID128_FORMAT_VAL(machine), name);
191 r = is_dir(arg_output);
193 r = asprintf(&output,
194 "%s/remote-%s.journal", arg_output, name);
198 output = strdup(arg_output);
204 r = journal_file_open_reliably(output,
205 O_RDWR|O_CREAT, 0640,
206 arg_compress, arg_seal,
211 log_error("Failed to open output journal %s: %s",
212 arg_output, strerror(-r));
214 log_info("Opened output file %s", s->journal->path);
218 /**********************************************************************
219 **********************************************************************
220 **********************************************************************/
222 typedef struct MHDDaemonWrapper {
224 struct MHD_Daemon *daemon;
226 sd_event_source *event;
229 typedef struct RemoteServer {
230 RemoteSource **sources;
231 ssize_t sources_size;
235 sd_event_source *sigterm_event, *sigint_event, *listen_event;
242 /* This should go away as soon as µhttpd allows state to be passed around. */
243 static RemoteServer *server;
245 static int dispatch_raw_source_event(sd_event_source *event,
249 static int dispatch_raw_connection_event(sd_event_source *event,
253 static int dispatch_http_event(sd_event_source *event,
258 static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) {
262 if (!GREEDY_REALLOC0_T(s->sources, s->sources_size, fd + 1))
265 if (s->sources[fd] == NULL) {
266 s->sources[fd] = new0(RemoteSource, 1);
269 s->sources[fd]->fd = -1;
273 *source = s->sources[fd];
277 static int remove_source(RemoteServer *s, int fd) {
278 RemoteSource *source;
282 assert(fd < s->sources_size);
284 source = s->sources[fd];
287 s->sources[fd] = NULL;
296 static int add_source(RemoteServer *s, int fd, const char* name) {
297 RemoteSource *source = NULL;
305 realname = strdup(name);
309 r = asprintf(&realname, "fd:%d", fd);
314 log_debug("Creating source for fd:%d (%s)", fd, name);
316 r = get_source_for_fd(s, fd, &source);
318 log_error("Failed to create source for fd:%d (%s)", fd, name);
322 assert(source->fd < 0);
325 r = sd_event_add_io(s->events, &source->event,
326 fd, EPOLLIN, dispatch_raw_source_event, s);
328 log_error("Failed to register event source for fd:%d: %s",
333 return 1; /* work to do */
336 remove_source(s, fd);
340 static int add_raw_socket(RemoteServer *s, int fd) {
343 r = sd_event_add_io(s->events, &s->listen_event, fd, EPOLLIN,
344 dispatch_raw_connection_event, s);
354 static int setup_raw_socket(RemoteServer *s, const char *address) {
357 fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
361 return add_raw_socket(s, fd);
364 /**********************************************************************
365 **********************************************************************
366 **********************************************************************/
368 static RemoteSource *request_meta(void **connection_cls) {
369 RemoteSource *source;
371 assert(connection_cls);
373 return *connection_cls;
375 source = new0(RemoteSource, 1);
380 log_debug("Added RemoteSource as connection metadata %p", source);
382 *connection_cls = source;
386 static void request_meta_free(void *cls,
387 struct MHD_Connection *connection,
388 void **connection_cls,
389 enum MHD_RequestTerminationCode toe) {
392 assert(connection_cls);
395 log_debug("Cleaning up connection metadata %p", s);
397 *connection_cls = NULL;
400 static int process_http_upload(
401 struct MHD_Connection *connection,
402 const char *upload_data,
403 size_t *upload_data_size,
404 RemoteSource *source) {
406 bool finished = false;
411 log_debug("request_handler_upload: connection %p, %zu bytes",
412 connection, *upload_data_size);
414 if (*upload_data_size) {
415 log_info("Received %zu bytes", *upload_data_size);
417 r = push_data(source, upload_data, *upload_data_size);
419 log_error("Failed to store received data of size %zu: %s",
420 *upload_data_size, strerror(-r));
421 return mhd_respond_oom(connection);
423 *upload_data_size = 0;
428 r = process_source(source, &server->writer, arg_compress, arg_seal);
430 log_warning("Entry too big, skipped");
431 else if (r == -EAGAIN || r == -EWOULDBLOCK)
434 log_warning("Failed to process data for connection %p", connection);
435 return mhd_respondf(connection, MHD_HTTP_UNPROCESSABLE_ENTITY,
436 "Processing failed: %s", strerror(-r));
443 /* The upload is finished */
445 if (source_non_empty(source)) {
446 log_warning("EOF reached with incomplete data");
447 return mhd_respond(connection, MHD_HTTP_EXPECTATION_FAILED,
448 "Trailing data not processed.");
451 return mhd_respond(connection, MHD_HTTP_ACCEPTED, "OK.\n");
454 static int request_handler(
456 struct MHD_Connection *connection,
460 const char *upload_data,
461 size_t *upload_data_size,
462 void **connection_cls) {
468 assert(connection_cls);
472 log_debug("Handling a connection %s %s %s", method, url, version);
475 return process_http_upload(connection,
476 upload_data, upload_data_size,
479 if (!streq(method, "POST"))
480 return mhd_respond(connection, MHD_HTTP_METHOD_NOT_ACCEPTABLE,
481 "Unsupported method.\n");
483 if (!streq(url, "/upload"))
484 return mhd_respond(connection, MHD_HTTP_NOT_FOUND,
487 header = MHD_lookup_connection_value(connection,
488 MHD_HEADER_KIND, "Content-Type");
489 if (!header || !streq(header, "application/vnd.fdo.journal"))
490 return mhd_respond(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
491 "Content-Type: application/vnd.fdo.journal"
495 r = check_permissions(connection, &code);
500 if (!request_meta(connection_cls))
501 return respond_oom(connection);
505 static int setup_microhttpd_server(RemoteServer *s, int fd, bool https) {
506 struct MHD_OptionItem opts[] = {
507 { MHD_OPTION_NOTIFY_COMPLETED, (intptr_t) request_meta_free},
508 { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t) microhttpd_logger},
509 { MHD_OPTION_LISTEN_SOCKET, fd},
517 MHD_USE_PEDANTIC_CHECKS |
518 MHD_USE_EPOLL_LINUX_ONLY |
521 const union MHD_DaemonInfo *info;
527 r = fd_nonblock(fd, true);
529 log_error("Failed to make fd:%d nonblocking: %s", fd, strerror(-r));
534 opts[opts_pos++] = (struct MHD_OptionItem)
535 {MHD_OPTION_HTTPS_MEM_KEY, 0, key_pem};
536 opts[opts_pos++] = (struct MHD_OptionItem)
537 {MHD_OPTION_HTTPS_MEM_CERT, 0, cert_pem};
539 flags |= MHD_USE_SSL;
542 opts[opts_pos++] = (struct MHD_OptionItem)
543 {MHD_OPTION_HTTPS_MEM_TRUST, 0, trust_pem};
546 d = new(MHDDaemonWrapper, 1);
550 d->fd = (uint64_t) fd;
552 d->daemon = MHD_start_daemon(flags, 0,
554 request_handler, NULL,
555 MHD_OPTION_ARRAY, opts,
558 log_error("Failed to start µhttp daemon");
563 log_debug("Started MHD %s daemon on fd:%d (wrapper @ %p)",
564 https ? "HTTPS" : "HTTP", fd, d);
567 info = MHD_get_daemon_info(d->daemon, MHD_DAEMON_INFO_EPOLL_FD_LINUX_ONLY);
569 log_error("µhttp returned NULL daemon info");
574 epoll_fd = info->listen_fd;
576 log_error("µhttp epoll fd is invalid");
581 r = sd_event_add_io(s->events, &d->event,
582 epoll_fd, EPOLLIN, dispatch_http_event, d);
584 log_error("Failed to add event callback: %s", strerror(-r));
588 r = hashmap_ensure_allocated(&s->daemons, uint64_hash_func, uint64_compare_func);
594 r = hashmap_put(s->daemons, &d->fd, d);
596 log_error("Failed to add daemon to hashmap: %s", strerror(-r));
604 MHD_stop_daemon(d->daemon);
610 static int setup_microhttpd_socket(RemoteServer *s,
615 fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
619 return setup_microhttpd_server(s, fd, https);
622 static int dispatch_http_event(sd_event_source *event,
626 MHDDaemonWrapper *d = userdata;
631 log_info("%s", __func__);
633 r = MHD_run(d->daemon);
635 log_error("MHD_run failed!");
636 // XXX: unregister daemon
640 return 1; /* work to do */
643 /**********************************************************************
644 **********************************************************************
645 **********************************************************************/
647 static int dispatch_sigterm(sd_event_source *event,
648 const struct signalfd_siginfo *si,
650 RemoteServer *s = userdata;
654 log_received_signal(LOG_INFO, si);
656 sd_event_exit(s->events, 0);
660 static int setup_signals(RemoteServer *s) {
666 assert_se(sigemptyset(&mask) == 0);
667 sigset_add_many(&mask, SIGINT, SIGTERM, -1);
668 assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
670 r = sd_event_add_signal(s->events, &s->sigterm_event, SIGTERM, dispatch_sigterm, s);
674 r = sd_event_add_signal(s->events, &s->sigint_event, SIGINT, dispatch_sigterm, s);
681 static int fd_fd(const char *spec) {
684 r = safe_atoi(spec, &fd);
695 static int remoteserver_init(RemoteServer *s) {
697 const char *output_name = NULL;
702 sd_event_default(&s->events);
706 assert(server == NULL);
709 n = sd_listen_fds(true);
711 log_error("Failed to read listening file descriptors from environment: %s",
715 log_info("Received %d descriptors", n);
717 if (MAX(http_socket, https_socket) >= SD_LISTEN_FDS_START + n) {
718 log_error("Received fewer sockets than expected");
722 for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
723 if (sd_is_socket(fd, AF_UNSPEC, 0, false)) {
724 log_info("Received a listening socket (fd:%d)", fd);
726 if (fd == http_socket)
727 r = setup_microhttpd_server(s, fd, false);
728 else if (fd == https_socket)
729 r = setup_microhttpd_server(s, fd, true);
731 r = add_raw_socket(s, fd);
732 } else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) {
733 log_info("Received a connection socket (fd:%d)", fd);
735 r = add_source(s, fd, NULL);
737 log_error("Unknown socket passed on fd:%d", fd);
743 log_error("Failed to register socket (fd:%d): %s",
748 output_name = "socket";
752 char _cleanup_free_ *url = NULL;
753 char _cleanup_strv_free_ **urlv = strv_new(arg_url, "/entries", NULL);
756 url = strv_join(urlv, "");
761 log_info("Spawning getter %s...", url);
762 fd = spawn_getter(arg_getter, url);
764 log_info("Spawning curl %s...", url);
765 fd = spawn_curl(url);
770 r = add_source(s, fd, arg_url);
774 output_name = arg_url;
777 if (arg_listen_raw) {
778 log_info("Listening on a socket...");
779 r = setup_raw_socket(s, arg_listen_raw);
783 output_name = arg_listen_raw;
786 if (arg_listen_http) {
787 r = setup_microhttpd_socket(s, arg_listen_http, false);
791 output_name = arg_listen_http;
794 if (arg_listen_https) {
795 r = setup_microhttpd_socket(s, arg_listen_https, true);
799 output_name = arg_listen_https;
802 STRV_FOREACH(file, arg_files) {
803 if (streq(*file, "-")) {
804 log_info("Reading standard input...");
807 output_name = "stdin";
809 log_info("Reading file %s...", *file);
811 fd = open(*file, O_RDONLY|O_CLOEXEC|O_NOCTTY|O_NONBLOCK);
813 log_error("Failed to open %s: %m", *file);
819 r = add_source(s, fd, output_name);
824 if (s->active == 0) {
825 log_error("Zarro sources specified");
829 if (!!n + !!arg_url + !!arg_listen_raw + !!arg_files)
830 output_name = "multiple";
832 r = writer_init(&s->writer);
836 r = open_output(&s->writer, output_name);
840 static int server_destroy(RemoteServer *s) {
845 r = writer_close(&s->writer);
847 while ((d = hashmap_steal_first(s->daemons))) {
848 MHD_stop_daemon(d->daemon);
849 sd_event_source_unref(d->event);
853 hashmap_free(s->daemons);
855 assert(s->sources_size == 0 || s->sources);
856 for (i = 0; i < s->sources_size; i++)
861 sd_event_source_unref(s->sigterm_event);
862 sd_event_source_unref(s->sigint_event);
863 sd_event_source_unref(s->listen_event);
864 sd_event_unref(s->events);
866 /* fds that we're listening on remain open... */
871 /**********************************************************************
872 **********************************************************************
873 **********************************************************************/
875 static int dispatch_raw_source_event(sd_event_source *event,
880 RemoteServer *s = userdata;
881 RemoteSource *source;
884 assert(fd < s->sources_size);
885 source = s->sources[fd];
886 assert(source->fd == fd);
888 r = process_source(source, &s->writer, arg_compress, arg_seal);
889 if (source->state == STATE_EOF) {
890 log_info("EOF reached with source fd:%d (%s)",
891 source->fd, source->name);
892 if (source_non_empty(source))
893 log_warning("EOF reached with incomplete data");
894 remove_source(s, source->fd);
895 log_info("%zd active source remaining", s->active);
896 } else if (r == -E2BIG) {
897 log_error("Entry too big, skipped");
904 static int accept_connection(const char* type, int fd, SocketAddress *addr) {
907 log_debug("Accepting new %s connection on fd:%d", type, fd);
908 fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
910 log_error("accept() on fd:%d failed: %m", fd);
914 switch(socket_address_family(addr)) {
917 char* _cleanup_free_ a = NULL;
919 r = socket_address_print(addr, &a);
921 log_error("socket_address_print(): %s", strerror(-r));
926 log_info("Accepted %s %s connection from %s",
928 socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
934 log_error("Rejected %s connection with unsupported family %d",
935 type, socket_address_family(addr));
942 static int dispatch_raw_connection_event(sd_event_source *event,
946 RemoteServer *s = userdata;
948 SocketAddress addr = {
949 .size = sizeof(union sockaddr_union),
953 fd2 = accept_connection("raw", fd, &addr);
957 return add_source(s, fd2, NULL);
960 /**********************************************************************
961 **********************************************************************
962 **********************************************************************/
964 static int help(void) {
965 printf("%s [OPTIONS...] {FILE|-}...\n\n"
966 "Write external journal events to a journal file.\n\n"
968 " --url=URL Read events from systemd-journal-gatewayd at URL\n"
969 " --getter=COMMAND Read events from the output of COMMAND\n"
970 " --listen-raw=ADDR Listen for connections at ADDR\n"
971 " --listen-http=ADDR Listen for HTTP connections at ADDR\n"
972 " --listen-https=ADDR Listen for HTTPS connections at ADDR\n"
973 " -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n"
974 " --[no-]compress Use XZ-compression in the output journal (default: yes)\n"
975 " --[no-]seal Use Event sealing in the output journal (default: no)\n"
976 " -h --help Show this help and exit\n"
977 " --version Print version string and exit\n"
979 "Note: file descriptors from sd_listen_fds() will be consumed, too.\n"
980 , program_invocation_short_name);
985 static int parse_argv(int argc, char *argv[]) {
1002 static const struct option options[] = {
1003 { "help", no_argument, NULL, 'h' },
1004 { "version", no_argument, NULL, ARG_VERSION },
1005 { "url", required_argument, NULL, ARG_URL },
1006 { "getter", required_argument, NULL, ARG_GETTER },
1007 { "listen-raw", required_argument, NULL, ARG_LISTEN_RAW },
1008 { "listen-http", required_argument, NULL, ARG_LISTEN_HTTP },
1009 { "listen-https", required_argument, NULL, ARG_LISTEN_HTTPS },
1010 { "output", required_argument, NULL, 'o' },
1011 { "compress", no_argument, NULL, ARG_COMPRESS },
1012 { "no-compress", no_argument, NULL, ARG_NO_COMPRESS },
1013 { "seal", no_argument, NULL, ARG_SEAL },
1014 { "no-seal", no_argument, NULL, ARG_NO_SEAL },
1015 { "key", required_argument, NULL, ARG_KEY },
1016 { "cert", required_argument, NULL, ARG_CERT },
1017 { "trust", required_argument, NULL, ARG_TRUST },
1026 while ((c = getopt_long(argc, argv, "ho:", options, NULL)) >= 0)
1030 return 0 /* done */;
1033 puts(PACKAGE_STRING);
1034 puts(SYSTEMD_FEATURES);
1035 return 0 /* done */;
1039 log_error("cannot currently set more than one --url");
1048 log_error("cannot currently use --getter more than once");
1052 arg_getter = optarg;
1055 case ARG_LISTEN_RAW:
1056 if (arg_listen_raw) {
1057 log_error("cannot currently use --listen-raw more than once");
1061 arg_listen_raw = optarg;
1064 case ARG_LISTEN_HTTP:
1065 if (arg_listen_http || http_socket >= 0) {
1066 log_error("cannot currently use --listen-http more than once");
1073 else if (r == -ENOENT)
1074 arg_listen_http = optarg;
1076 log_error("Invalid port/fd specification %s: %s",
1077 optarg, strerror(-r));
1083 case ARG_LISTEN_HTTPS:
1084 if (arg_listen_https || https_socket >= 0) {
1085 log_error("cannot currently use --listen-https more than once");
1092 else if (r == -ENOENT)
1093 arg_listen_https = optarg;
1095 log_error("Invalid port/fd specification %s: %s",
1096 optarg, strerror(-r));
1104 log_error("Key file specified twice");
1107 r = read_full_file(optarg, &key_pem, NULL);
1109 log_error("Failed to read key file: %s", strerror(-r));
1117 log_error("Certificate file specified twice");
1120 r = read_full_file(optarg, &cert_pem, NULL);
1122 log_error("Failed to read certificate file: %s", strerror(-r));
1131 log_error("CA certificate file specified twice");
1134 r = read_full_file(optarg, &trust_pem, NULL);
1136 log_error("Failed to read CA certificate file: %s", strerror(-r));
1142 log_error("Option --trust is not available.");
1147 log_error("cannot use --output/-o more than once");
1151 arg_output = optarg;
1155 arg_compress = true;
1157 case ARG_NO_COMPRESS:
1158 arg_compress = false;
1171 log_error("Unknown option code %c", c);
1175 if (arg_listen_https && !(key_pem && cert_pem)) {
1176 log_error("Options --key and --cert must be used when https sources are specified");
1181 arg_files = argv + optind;
1183 return 1 /* work to do */;
1186 static int setup_gnutls_logger(void) {
1187 if (!arg_listen_http && !arg_listen_https)
1191 gnutls_global_set_log_function(log_func_gnutls);
1192 gnutls_global_set_log_level(GNUTLS_LOG_LEVEL);
1198 int main(int argc, char **argv) {
1199 RemoteServer s = {};
1202 log_set_max_level(LOG_DEBUG);
1203 log_show_color(true);
1204 log_parse_environment();
1206 r = parse_argv(argc, argv);
1208 return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
1210 r = setup_gnutls_logger();
1212 return EXIT_FAILURE;
1214 if (remoteserver_init(&s) < 0)
1215 return EXIT_FAILURE;
1217 log_debug("%s running as pid %lu",
1218 program_invocation_short_name, (unsigned long) getpid());
1221 "STATUS=Processing requests...");
1224 r = sd_event_get_state(s.events);
1227 if (r == SD_EVENT_FINISHED)
1230 r = sd_event_run(s.events, -1);
1232 log_error("Failed to run event loop: %s", strerror(-r));
1237 log_info("Finishing after writing %" PRIu64 " entries", s.writer.seqnum);
1238 r2 = server_destroy(&s);
1240 sd_notify(false, "STATUS=Shutting down...");
1242 return r >= 0 && r2 >= 0 ? EXIT_SUCCESS : EXIT_FAILURE;