X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=blobdiff_plain;f=src%2Fjournal-remote%2Fjournal-upload.c;h=75bb434c0861f44a9ca924682c07416f57ac3dc4;hb=efe0286285a7432f738fafae840fa4eda51c2986;hp=538ba8b6502deb726c3f37632f54f39ea475d77b;hpb=7449bc1f34c206e3ff8e274cd74e2db950d492a1;p=elogind.git diff --git a/src/journal-remote/journal-upload.c b/src/journal-remote/journal-upload.c index 538ba8b65..75bb434c0 100644 --- a/src/journal-remote/journal-upload.c +++ b/src/journal-remote/journal-upload.c @@ -26,22 +26,42 @@ #include #include "sd-daemon.h" - #include "log.h" #include "util.h" #include "build.h" +#include "fileio.h" +#include "mkdir.h" +#include "conf-parser.h" +#include "sigbus.h" #include "journal-upload.h" -static const char* arg_url; - -static void close_fd_input(Uploader *u); +#define PRIV_KEY_FILE CERTIFICATE_ROOT "/private/journal-upload.pem" +#define CERT_FILE CERTIFICATE_ROOT "/certs/journal-upload.pem" +#define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem" +#define DEFAULT_PORT 19532 +static const char* arg_url = NULL; static const char *arg_key = NULL; static const char *arg_cert = NULL; static const char *arg_trust = NULL; +static const char *arg_directory = NULL; +static char **arg_file = NULL; +static const char *arg_cursor = NULL; +static bool arg_after_cursor = false; +static int arg_journal_type = 0; +static const char *arg_machine = NULL; +static bool arg_merge = false; +static int arg_follow = -1; +static const char *arg_save_state = NULL; + +static void close_fd_input(Uploader *u); + +#define SERVER_ANSWER_KEEP 2048 + +#define STATE_FILE "/var/lib/systemd/journal-upload/state" #define easy_setopt(curl, opt, value, level, cmd) \ - { \ + do { \ code = curl_easy_setopt(curl, opt, value); \ if (code) { \ log_full(level, \ @@ -49,8 +69,106 @@ static const char *arg_trust = NULL; curl_easy_strerror(code)); \ cmd; \ } \ + } while(0) + +static size_t output_callback(char *buf, + size_t size, + size_t nmemb, + void *userp) { + Uploader *u = userp; + + assert(u); + + log_debug("The server answers (%zu bytes): %.*s", + size*nmemb, (int)(size*nmemb), buf); + + if (nmemb && !u->answer) { + u->answer = strndup(buf, size*nmemb); + if (!u->answer) + log_warning_errno(ENOMEM, "Failed to store server answer (%zu bytes): %m", + size*nmemb); } + return size * nmemb; +} + +static int check_cursor_updating(Uploader *u) { + _cleanup_free_ char *temp_path = NULL; + _cleanup_fclose_ FILE *f = NULL; + int r; + + if (!u->state_file) + return 0; + + r = mkdir_parents(u->state_file, 0755); + if (r < 0) + return log_error_errno(r, "Cannot create parent directory of state file %s: %m", + u->state_file); + + r = fopen_temporary(u->state_file, &f, &temp_path); + if (r < 0) + return log_error_errno(r, "Cannot save state to %s: %m", + u->state_file); + unlink(temp_path); + + return 0; +} + +static int update_cursor_state(Uploader *u) { + _cleanup_free_ char *temp_path = NULL; + _cleanup_fclose_ FILE *f = NULL; + int r; + + if (!u->state_file || !u->last_cursor) + return 0; + + r = fopen_temporary(u->state_file, &f, &temp_path); + if (r < 0) + goto finish; + + fprintf(f, + "# This is private data. Do not parse.\n" + "LAST_CURSOR=%s\n", + u->last_cursor); + + fflush(f); + + if (ferror(f) || rename(temp_path, u->state_file) < 0) { + r = -errno; + unlink(u->state_file); + unlink(temp_path); + } + +finish: + if (r < 0) + log_error_errno(r, "Failed to save state %s: %m", u->state_file); + + return r; +} + +static int load_cursor_state(Uploader *u) { + int r; + + if (!u->state_file) + return 0; + + r = parse_env_file(u->state_file, NEWLINE, + "LAST_CURSOR", &u->last_cursor, + NULL); + + if (r == -ENOENT) + log_debug("State file %s is not present.", u->state_file); + else if (r < 0) + return log_error_errno(r, "Failed to read state file %s: %m", + u->state_file); + else + log_debug("Last cursor was %s", u->last_cursor); + + return 0; +} + + + int start_upload(Uploader *u, size_t (*input_callback)(void *ptr, size_t size, @@ -97,6 +215,16 @@ int start_upload(Uploader *u, easy_setopt(curl, CURLOPT_POST, 1L, LOG_ERR, return -EXFULL); + easy_setopt(curl, CURLOPT_ERRORBUFFER, u->error, + LOG_ERR, return -EXFULL); + + /* set where to write to */ + easy_setopt(curl, CURLOPT_WRITEFUNCTION, output_callback, + LOG_ERR, return -EXFULL); + + easy_setopt(curl, CURLOPT_WRITEDATA, data, + LOG_ERR, return -EXFULL); + /* set where to read from */ easy_setopt(curl, CURLOPT_READFUNCTION, input_callback, LOG_ERR, return -EXFULL); @@ -108,24 +236,26 @@ int start_upload(Uploader *u, easy_setopt(curl, CURLOPT_HTTPHEADER, u->header, LOG_ERR, return -EXFULL); - /* enable verbose for easier tracing */ - easy_setopt(curl, CURLOPT_VERBOSE, 1L, LOG_WARNING, ); + if (_unlikely_(log_get_max_level() >= LOG_DEBUG)) + /* enable verbose for easier tracing */ + easy_setopt(curl, CURLOPT_VERBOSE, 1L, LOG_WARNING, ); easy_setopt(curl, CURLOPT_USERAGENT, "systemd-journal-upload " PACKAGE_STRING, LOG_WARNING, ); - if (arg_key) { - assert(arg_cert); - - easy_setopt(curl, CURLOPT_SSLKEY, arg_key, + if (arg_key || startswith(u->url, "https://")) { + easy_setopt(curl, CURLOPT_SSLKEY, arg_key ?: PRIV_KEY_FILE, LOG_ERR, return -EXFULL); - easy_setopt(curl, CURLOPT_SSLCERT, arg_cert, + easy_setopt(curl, CURLOPT_SSLCERT, arg_cert ?: CERT_FILE, LOG_ERR, return -EXFULL); } - if (arg_trust) - easy_setopt(curl, CURLOPT_CAINFO, arg_trust, + if (streq_ptr(arg_trust, "all")) + easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0, + LOG_ERR, return -EUCLEAN); + else if (arg_trust || startswith(u->url, "https://")) + easy_setopt(curl, CURLOPT_CAINFO, arg_trust ?: TRUST_FILE, LOG_ERR, return -EXFULL); if (arg_key || arg_trust) @@ -133,6 +263,12 @@ int start_upload(Uploader *u, LOG_WARNING, ); u->easy = curl; + } else { + /* truncate the potential old error message */ + u->error[0] = '\0'; + + free(u->answer); + u->answer = 0; } /* upload to this place */ @@ -160,7 +296,7 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user return 0; r = read(u->input, buf, size * nmemb); - log_debug("%s: allowed %zu, read %zu", __func__, size*nmemb, r); + log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, r); if (r > 0) return r; @@ -171,7 +307,7 @@ static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *user close_fd_input(u); return 0; } else { - log_error("Aborting transfer after read error on input: %m."); + log_error_errno(errno, "Aborting transfer after read error on input: %m."); return CURL_READFUNC_ABORT; } } @@ -182,6 +318,7 @@ static void close_fd_input(Uploader *u) { if (u->input >= 0) close_nointr(u->input); u->input = -1; + u->timeout = 0; } static int dispatch_fd_input(sd_event_source *event, @@ -191,9 +328,19 @@ static int dispatch_fd_input(sd_event_source *event, Uploader *u = userp; assert(u); - assert(revents & EPOLLIN); assert(fd >= 0); + if (revents & EPOLLHUP) { + log_debug("Received HUP"); + close_fd_input(u); + return 0; + } + + if (!(revents & EPOLLIN)) { + log_warning("Unexpected poll event %"PRIu32".", revents); + return -EINVAL; + } + if (u->uploading) { log_warning("dispatch_fd_input called when uploading, ignoring."); return 0; @@ -203,52 +350,112 @@ static int dispatch_fd_input(sd_event_source *event, } static int open_file_for_upload(Uploader *u, const char *filename) { - int fd, r; + int fd, r = 0; if (streq(filename, "-")) fd = STDIN_FILENO; else { fd = open(filename, O_RDONLY|O_CLOEXEC|O_NOCTTY); - if (fd < 0) { - log_error("Failed to open %s: %m", filename); - return -errno; - } + if (fd < 0) + return log_error_errno(errno, "Failed to open %s: %m", filename); } u->input = fd; - r = sd_event_add_io(u->events, &u->input_event, - fd, EPOLLIN, dispatch_fd_input, u); - if (r < 0) { - if (r != -EPERM) { - log_error("Failed to register input event: %s", strerror(-r)); - return r; - } + if (arg_follow) { + r = sd_event_add_io(u->events, &u->input_event, + fd, EPOLLIN, dispatch_fd_input, u); + if (r < 0) { + if (r != -EPERM || arg_follow > 0) + return log_error_errno(r, "Failed to register input event: %m"); - /* Normal files should just be consumed without polling. */ - r = start_upload(u, fd_input_callback, u); + /* Normal files should just be consumed without polling. */ + r = start_upload(u, fd_input_callback, u); + } } + return r; } -static int setup_uploader(Uploader *u, const char *url) { +static int dispatch_sigterm(sd_event_source *event, + const struct signalfd_siginfo *si, + void *userdata) { + Uploader *u = userdata; + + assert(u); + + log_received_signal(LOG_INFO, si); + + close_fd_input(u); + close_journal_input(u); + + sd_event_exit(u->events, 0); + return 0; +} + +static int setup_signals(Uploader *u) { + sigset_t mask; int r; + assert(u); + + assert_se(sigemptyset(&mask) == 0); + sigset_add_many(&mask, SIGINT, SIGTERM, -1); + assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); + + r = sd_event_add_signal(u->events, &u->sigterm_event, SIGTERM, dispatch_sigterm, u); + if (r < 0) + return r; + + r = sd_event_add_signal(u->events, &u->sigint_event, SIGINT, dispatch_sigterm, u); + if (r < 0) + return r; + + return 0; +} + +static int setup_uploader(Uploader *u, const char *url, const char *state_file) { + int r; + const char *host, *proto = ""; + assert(u); assert(url); memzero(u, sizeof(Uploader)); u->input = -1; - u->url = url; + if (!(host = startswith(url, "http://")) && !(host = startswith(url, "https://"))) { + host = url; + proto = "https://"; + } - r = sd_event_default(&u->events); - if (r < 0) { - log_error("sd_event_default failed: %s", strerror(-r)); - return r; + if (strchr(host, ':')) + u->url = strjoin(proto, url, "/upload", NULL); + else { + char *t; + size_t x; + + t = strdupa(url); + x = strlen(t); + while (x > 0 && t[x - 1] == '/') + t[x - 1] = '\0'; + + u->url = strjoin(proto, t, ":" STRINGIFY(DEFAULT_PORT), "/upload", NULL); } + if (!u->url) + return log_oom(); - return 0; + u->state_file = state_file; + + r = sd_event_default(&u->events); + if (r < 0) + return log_error_errno(r, "sd_event_default failed: %m"); + + r = setup_signals(u); + if (r < 0) + return log_error_errno(r, "Failed to set up signals: %m"); + + return load_cursor_state(u); } static void destroy_uploader(Uploader *u) { @@ -256,24 +463,106 @@ static void destroy_uploader(Uploader *u) { curl_easy_cleanup(u->easy); curl_slist_free_all(u->header); + free(u->answer); + + free(u->last_cursor); + free(u->current_cursor); + + free(u->url); u->input_event = sd_event_source_unref(u->input_event); close_fd_input(u); + close_journal_input(u); + sd_event_source_unref(u->sigterm_event); + sd_event_source_unref(u->sigint_event); sd_event_unref(u->events); } +static int perform_upload(Uploader *u) { + CURLcode code; + long status; + + assert(u); + + code = curl_easy_perform(u->easy); + if (code) { + if (u->error[0]) + log_error("Upload to %s failed: %.*s", + u->url, (int) sizeof(u->error), u->error); + else + log_error("Upload to %s failed: %s", + u->url, curl_easy_strerror(code)); + return -EIO; + } + + code = curl_easy_getinfo(u->easy, CURLINFO_RESPONSE_CODE, &status); + if (code) { + log_error("Failed to retrieve response code: %s", + curl_easy_strerror(code)); + return -EUCLEAN; + } + + if (status >= 300) { + log_error("Upload to %s failed with code %ld: %s", + u->url, status, strna(u->answer)); + return -EIO; + } else if (status < 200) { + log_error("Upload to %s finished with unexpected code %ld: %s", + u->url, status, strna(u->answer)); + return -EIO; + } else + log_debug("Upload finished successfully with code %ld: %s", + status, strna(u->answer)); + + free(u->last_cursor); + u->last_cursor = u->current_cursor; + u->current_cursor = NULL; + + return update_cursor_state(u); +} + +static int parse_config(void) { + const ConfigTableItem items[] = { + { "Upload", "URL", config_parse_string, 0, &arg_url }, + { "Upload", "ServerKeyFile", config_parse_path, 0, &arg_key }, + { "Upload", "ServerCertificateFile", config_parse_path, 0, &arg_cert }, + { "Upload", "TrustedCertificateFile", config_parse_path, 0, &arg_trust }, + {}}; + + return config_parse_many(PKGSYSCONFDIR "/journal-upload.conf", + CONF_DIRS_NULSTR("systemd/journal-upload.conf"), + "Upload\0", config_item_table_lookup, items, + false, NULL); +} + static void help(void) { printf("%s -u URL {FILE|-}...\n\n" "Upload journal events to a remote server.\n\n" - "Options:\n" - " --url=URL Upload to this address\n" - " --key=FILENAME Specify key in PEM format\n" - " --cert=FILENAME Specify certificate in PEM format\n" - " --trust=FILENAME Specify CA certificate in PEM format\n" - " -h --help Show this help and exit\n" - " --version Print version string and exit\n" + " -h --help Show this help\n" + " --version Show package version\n" + " -u --url=URL Upload to this address (default port " + STRINGIFY(DEFAULT_PORT) ")\n" + " --key=FILENAME Specify key in PEM format (default:\n" + " \"" PRIV_KEY_FILE "\")\n" + " --cert=FILENAME Specify certificate in PEM format (default:\n" + " \"" CERT_FILE "\")\n" + " --trust=FILENAME|all Specify CA certificate or disable checking (default:\n" + " \"" TRUST_FILE "\")\n" + " --system Use the system journal\n" + " --user Use the user journal for the current user\n" + " -m --merge Use all available journals\n" + " -M --machine=CONTAINER Operate on local container\n" + " -D --directory=PATH Use journal files from directory\n" + " --file=PATH Use this journal file\n" + " --cursor=CURSOR Start at the specified cursor\n" + " --after-cursor=CURSOR Start after the specified cursor\n" + " --follow[=BOOL] Do [not] wait for input\n" + " --save-state[=FILE] Save uploaded cursors (default \n" + " " STATE_FILE ")\n" + " -h --help Show this help and exit\n" + " --version Print version string and exit\n" , program_invocation_short_name); } @@ -283,6 +572,13 @@ static int parse_argv(int argc, char *argv[]) { ARG_KEY, ARG_CERT, ARG_TRUST, + ARG_USER, + ARG_SYSTEM, + ARG_FILE, + ARG_CURSOR, + ARG_AFTER_CURSOR, + ARG_FOLLOW, + ARG_SAVE_STATE, }; static const struct option options[] = { @@ -292,17 +588,27 @@ static int parse_argv(int argc, char *argv[]) { { "key", required_argument, NULL, ARG_KEY }, { "cert", required_argument, NULL, ARG_CERT }, { "trust", required_argument, NULL, ARG_TRUST }, + { "system", no_argument, NULL, ARG_SYSTEM }, + { "user", no_argument, NULL, ARG_USER }, + { "merge", no_argument, NULL, 'm' }, + { "machine", required_argument, NULL, 'M' }, + { "directory", required_argument, NULL, 'D' }, + { "file", required_argument, NULL, ARG_FILE }, + { "cursor", required_argument, NULL, ARG_CURSOR }, + { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR }, + { "follow", optional_argument, NULL, ARG_FOLLOW }, + { "save-state", optional_argument, NULL, ARG_SAVE_STATE }, {} }; - int c; + int c, r; assert(argc >= 0); assert(argv); opterr = 0; - while ((c = getopt_long(argc, argv, "hu:", options, NULL)) >= 0) + while ((c = getopt_long(argc, argv, "hu:mM:D:", options, NULL)) >= 0) switch(c) { case 'h': help(); @@ -349,6 +655,79 @@ static int parse_argv(int argc, char *argv[]) { arg_trust = optarg; break; + case ARG_SYSTEM: + arg_journal_type |= SD_JOURNAL_SYSTEM; + break; + + case ARG_USER: + arg_journal_type |= SD_JOURNAL_CURRENT_USER; + break; + + case 'm': + arg_merge = true; + break; + + case 'M': + if (arg_machine) { + log_error("cannot use more than one --machine/-M"); + return -EINVAL; + } + + arg_machine = optarg; + break; + + case 'D': + if (arg_directory) { + log_error("cannot use more than one --directory/-D"); + return -EINVAL; + } + + arg_directory = optarg; + break; + + case ARG_FILE: + r = glob_extend(&arg_file, optarg); + if (r < 0) + return log_error_errno(r, "Failed to add paths: %m"); + break; + + case ARG_CURSOR: + if (arg_cursor) { + log_error("cannot use more than one --cursor/--after-cursor"); + return -EINVAL; + } + + arg_cursor = optarg; + break; + + case ARG_AFTER_CURSOR: + if (arg_cursor) { + log_error("cannot use more than one --cursor/--after-cursor"); + return -EINVAL; + } + + arg_cursor = optarg; + arg_after_cursor = true; + break; + + case ARG_FOLLOW: + if (optarg) { + r = parse_boolean(optarg); + if (r < 0) { + log_error("Failed to parse --follow= parameter."); + return -EINVAL; + } + + arg_follow = !!r; + } else + arg_follow = true; + + break; + + case ARG_SAVE_STATE: + arg_save_state = optarg ?: STATE_FILE; + break; + case '?': log_error("Unknown option %s.", argv[optind-1]); return -EINVAL; @@ -371,81 +750,122 @@ static int parse_argv(int argc, char *argv[]) { return -EINVAL; } - if (optind >= argc) { - log_error("Input argument missing."); + if (optind < argc && (arg_directory || arg_file || arg_machine || arg_journal_type)) { + log_error("Input arguments make no sense with journal input."); return -EINVAL; } return 1; } +static int open_journal(sd_journal **j) { + int r; + + if (arg_directory) + r = sd_journal_open_directory(j, arg_directory, arg_journal_type); + else if (arg_file) + r = sd_journal_open_files(j, (const char**) arg_file, 0); + else if (arg_machine) + r = sd_journal_open_container(j, arg_machine, 0); + else + r = sd_journal_open(j, !arg_merge*SD_JOURNAL_LOCAL_ONLY + arg_journal_type); + if (r < 0) + log_error_errno(r, "Failed to open %s: %m", + arg_directory ? arg_directory : arg_file ? "files" : "journal"); + return r; +} int main(int argc, char **argv) { Uploader u; int r; + bool use_journal; log_show_color(true); log_parse_environment(); + r = parse_config(); + if (r < 0) + goto finish; + r = parse_argv(argc, argv); if (r <= 0) goto finish; - r = setup_uploader(&u, arg_url); + sigbus_install(); + + r = setup_uploader(&u, arg_url, arg_save_state); + if (r < 0) + goto cleanup; + + sd_event_set_watchdog(u.events, true); + + r = check_cursor_updating(&u); if (r < 0) goto cleanup; log_debug("%s running as pid "PID_FMT, program_invocation_short_name, getpid()); + + use_journal = optind >= argc; + if (use_journal) { + sd_journal *j; + r = open_journal(&j); + if (r < 0) + goto finish; + r = open_journal_for_upload(&u, j, + arg_cursor ?: u.last_cursor, + arg_cursor ? arg_after_cursor : true, + !!arg_follow); + if (r < 0) + goto finish; + } + sd_notify(false, "READY=1\n" "STATUS=Processing input..."); while (true) { - if (u.input < 0) { + r = sd_event_get_state(u.events); + if (r < 0) + break; + if (r == SD_EVENT_FINISHED) + break; + + if (use_journal) { + if (!u.journal) + break; + + r = check_journal_input(&u); + } else if (u.input < 0 && !use_journal) { if (optind >= argc) break; log_debug("Using %s as input.", argv[optind]); - r = open_file_for_upload(&u, argv[optind++]); - if (r < 0) - goto cleanup; - } - - r = sd_event_get_state(u.events); if (r < 0) - break; - if (r == SD_EVENT_FINISHED) - break; + goto cleanup; if (u.uploading) { - CURLcode code; - - assert(u.easy); - - code = curl_easy_perform(u.easy); - if (code) { - log_error("Upload to %s failed: %s", - u.url, curl_easy_strerror(code)); - r = -EIO; + r = perform_upload(&u); + if (r < 0) break; - } else - log_debug("Upload finished successfully."); } - r = sd_event_run(u.events, u.input >= 0 ? -1 : 0); + r = sd_event_run(u.events, u.timeout); if (r < 0) { - log_error("Failed to run event loop: %s", strerror(-r)); + log_error_errno(r, "Failed to run event loop: %m"); break; } } cleanup: - sd_notify(false, "STATUS=Shutting down..."); + sd_notify(false, + "STOPPING=1\n" + "STATUS=Shutting down..."); + destroy_uploader(&u); finish: - return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE; + return r >= 0 ? EXIT_SUCCESS : EXIT_FAILURE; }