chiark / gitweb /
journal-upload: make state persistent
[elogind.git] / src / journal-remote / journal-upload.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <stdio.h>
23 #include <curl/curl.h>
24 #include <sys/stat.h>
25 #include <fcntl.h>
26 #include <getopt.h>
27
28 #include "sd-daemon.h"
29
30 #include "log.h"
31 #include "util.h"
32 #include "build.h"
33 #include "fileio.h"
34 #include "journal-upload.h"
35
36 static const char* arg_url;
37
38 static void close_fd_input(Uploader *u);
39
40 static const char *arg_key = NULL;
41 static const char *arg_cert = NULL;
42 static const char *arg_trust = NULL;
43
44 static const char *arg_directory = NULL;
45 static char **arg_file = NULL;
46 static const char *arg_cursor = NULL;
47 static bool arg_after_cursor = false;
48 static int arg_journal_type = 0;
49 static const char *arg_machine = NULL;
50 static bool arg_merge = false;
51 static int arg_follow = -1;
52 static const char *arg_save_state = NULL;
53
54 #define SERVER_ANSWER_KEEP 2048
55
56 #define STATE_FILE "/var/lib/systemd/journal-upload/state"
57
58 #define easy_setopt(curl, opt, value, level, cmd)                       \
59         {                                                               \
60                 code = curl_easy_setopt(curl, opt, value);              \
61                 if (code) {                                             \
62                         log_full(level,                                 \
63                                  "curl_easy_setopt " #opt " failed: %s", \
64                                   curl_easy_strerror(code));            \
65                         cmd;                                            \
66                 }                                                       \
67         }
68
69 static size_t output_callback(char *buf,
70                               size_t size,
71                               size_t nmemb,
72                               void *userp) {
73         Uploader *u = userp;
74
75         assert(u);
76
77         log_debug("The server answers (%zu bytes): %.*s",
78                   size*nmemb, (int)(size*nmemb), buf);
79
80         if (nmemb && !u->answer) {
81                 u->answer = strndup(buf, size*nmemb);
82                 if (!u->answer)
83                         log_warning("Failed to store server answer (%zu bytes): %s",
84                                     size*nmemb, strerror(ENOMEM));
85         }
86
87         return size * nmemb;
88 }
89
90 static int update_cursor_state(Uploader *u) {
91         _cleanup_free_ char *temp_path = NULL;
92         _cleanup_fclose_ FILE *f = NULL;
93         int r;
94
95         if (!u->state_file || !u->last_cursor)
96                 return 0;
97
98         r = fopen_temporary(u->state_file, &f, &temp_path);
99         if (r < 0)
100                 goto finish;
101
102         fprintf(f,
103                 "# This is private data. Do not parse.\n"
104                 "LAST_CURSOR=%s\n",
105                 u->last_cursor);
106
107         fflush(f);
108
109         if (ferror(f) || rename(temp_path, u->state_file) < 0) {
110                 r = -errno;
111                 unlink(u->state_file);
112                 unlink(temp_path);
113         }
114
115 finish:
116         if (r < 0)
117                 log_error("Failed to save state %s: %s", u->state_file, strerror(-r));
118
119         return r;
120 }
121
122 static int load_cursor_state(Uploader *u) {
123         int r;
124
125         if (!u->state_file)
126                 return 0;
127
128         r = parse_env_file(u->state_file, NEWLINE,
129                            "LAST_CURSOR",  &u->last_cursor,
130                            NULL);
131
132         if (r < 0 && r != -ENOENT) {
133                 log_error("Failed to read state file %s: %s",
134                           u->state_file, strerror(-r));
135                 return r;
136         }
137
138         return 0;
139 }
140
141
142
143 int start_upload(Uploader *u,
144                  size_t (*input_callback)(void *ptr,
145                                           size_t size,
146                                           size_t nmemb,
147                                           void *userdata),
148                  void *data) {
149         CURLcode code;
150
151         assert(u);
152         assert(input_callback);
153
154         if (!u->header) {
155                 struct curl_slist *h;
156
157                 h = curl_slist_append(NULL, "Content-Type: application/vnd.fdo.journal");
158                 if (!h)
159                         return log_oom();
160
161                 h = curl_slist_append(h, "Transfer-Encoding: chunked");
162                 if (!h) {
163                         curl_slist_free_all(h);
164                         return log_oom();
165                 }
166
167                 h = curl_slist_append(h, "Accept: text/plain");
168                 if (!h) {
169                         curl_slist_free_all(h);
170                         return log_oom();
171                 }
172
173                 u->header = h;
174         }
175
176         if (!u->easy) {
177                 CURL *curl;
178
179                 curl = curl_easy_init();
180                 if (!curl) {
181                         log_error("Call to curl_easy_init failed.");
182                         return -ENOSR;
183                 }
184
185                 /* tell it to POST to the URL */
186                 easy_setopt(curl, CURLOPT_POST, 1L,
187                             LOG_ERR, return -EXFULL);
188
189                 easy_setopt(curl, CURLOPT_ERRORBUFFER, &u->error,
190                             LOG_ERR, return -EXFULL);
191
192                 /* set where to write to */
193                 easy_setopt(curl, CURLOPT_WRITEFUNCTION, output_callback,
194                             LOG_ERR, return -EXFULL);
195
196                 easy_setopt(curl, CURLOPT_WRITEDATA, data,
197                             LOG_ERR, return -EXFULL);
198
199                 /* set where to read from */
200                 easy_setopt(curl, CURLOPT_READFUNCTION, input_callback,
201                             LOG_ERR, return -EXFULL);
202
203                 easy_setopt(curl, CURLOPT_READDATA, data,
204                             LOG_ERR, return -EXFULL);
205
206                 /* use our special own mime type and chunked transfer */
207                 easy_setopt(curl, CURLOPT_HTTPHEADER, u->header,
208                             LOG_ERR, return -EXFULL);
209
210                 /* enable verbose for easier tracing */
211                 easy_setopt(curl, CURLOPT_VERBOSE, 1L, LOG_WARNING, );
212
213                 easy_setopt(curl, CURLOPT_USERAGENT,
214                             "systemd-journal-upload " PACKAGE_STRING,
215                             LOG_WARNING, );
216
217                 if (arg_key) {
218                         assert(arg_cert);
219
220                         easy_setopt(curl, CURLOPT_SSLKEY, arg_key,
221                                     LOG_ERR, return -EXFULL);
222                         easy_setopt(curl, CURLOPT_SSLCERT, arg_cert,
223                                     LOG_ERR, return -EXFULL);
224                 }
225
226                 if (arg_trust)
227                         easy_setopt(curl, CURLOPT_CAINFO, arg_trust,
228                                     LOG_ERR, return -EXFULL);
229
230                 if (arg_key || arg_trust)
231                         easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1,
232                                     LOG_WARNING, );
233
234                 u->easy = curl;
235         } else {
236                 /* truncate the potential old error message */
237                 u->error[0] = '\0';
238
239                 free(u->answer);
240                 u->answer = 0;
241         }
242
243         /* upload to this place */
244         code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url);
245         if (code) {
246                 log_error("curl_easy_setopt CURLOPT_URL failed: %s",
247                           curl_easy_strerror(code));
248                 return -EXFULL;
249         }
250
251         u->uploading = true;
252
253         return 0;
254 }
255
256 static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
257         Uploader *u = userp;
258
259         ssize_t r;
260
261         assert(u);
262         assert(nmemb <= SSIZE_MAX / size);
263
264         if (u->input < 0)
265                 return 0;
266
267         r = read(u->input, buf, size * nmemb);
268         log_debug("%s: allowed %zu, read %zu", __func__, size*nmemb, r);
269
270         if (r > 0)
271                 return r;
272
273         u->uploading = false;
274         if (r == 0) {
275                 log_debug("Reached EOF");
276                 close_fd_input(u);
277                 return 0;
278         } else {
279                 log_error("Aborting transfer after read error on input: %m.");
280                 return CURL_READFUNC_ABORT;
281         }
282 }
283
284 static void close_fd_input(Uploader *u) {
285         assert(u);
286
287         if (u->input >= 0)
288                 close_nointr(u->input);
289         u->input = -1;
290         u->timeout = 0;
291 }
292
293 static int dispatch_fd_input(sd_event_source *event,
294                              int fd,
295                              uint32_t revents,
296                              void *userp) {
297         Uploader *u = userp;
298
299         assert(u);
300         assert(revents & EPOLLIN);
301         assert(fd >= 0);
302
303         if (u->uploading) {
304                 log_warning("dispatch_fd_input called when uploading, ignoring.");
305                 return 0;
306         }
307
308         return start_upload(u, fd_input_callback, u);
309 }
310
311 static int open_file_for_upload(Uploader *u, const char *filename) {
312         int fd, r;
313
314         if (streq(filename, "-"))
315                 fd = STDIN_FILENO;
316         else {
317                 fd = open(filename, O_RDONLY|O_CLOEXEC|O_NOCTTY);
318                 if (fd < 0) {
319                         log_error("Failed to open %s: %m", filename);
320                         return -errno;
321                 }
322         }
323
324         u->input = fd;
325
326         if (arg_follow) {
327                 r = sd_event_add_io(u->events, &u->input_event,
328                                     fd, EPOLLIN, dispatch_fd_input, u);
329                 if (r < 0) {
330                         if (r != -EPERM || arg_follow > 0) {
331                                 log_error("Failed to register input event: %s", strerror(-r));
332                                 return r;
333                         }
334
335                         /* Normal files should just be consumed without polling. */
336                         r = start_upload(u, fd_input_callback, u);
337                 }
338         }
339
340         return r;
341 }
342
343 static int setup_uploader(Uploader *u, const char *url, const char *state_file) {
344         int r;
345
346         assert(u);
347         assert(url);
348
349         memzero(u, sizeof(Uploader));
350         u->input = -1;
351
352         u->url = url;
353         u->state_file = state_file;
354
355         r = sd_event_default(&u->events);
356         if (r < 0) {
357                 log_error("sd_event_default failed: %s", strerror(-r));
358                 return r;
359         }
360
361         return load_cursor_state(u);
362 }
363
364 static void destroy_uploader(Uploader *u) {
365         assert(u);
366
367         curl_easy_cleanup(u->easy);
368         curl_slist_free_all(u->header);
369         free(u->answer);
370
371         free(u->last_cursor);
372         free(u->current_cursor);
373
374         u->input_event = sd_event_source_unref(u->input_event);
375
376         close_fd_input(u);
377         close_journal_input(u);
378
379         sd_event_unref(u->events);
380 }
381
382 static int perform_upload(Uploader *u) {
383         CURLcode code;
384         long status;
385
386         assert(u);
387
388         code = curl_easy_perform(u->easy);
389         if (code) {
390                 log_error("Upload to %s failed: %.*s",
391                           u->url,
392                           u->error[0] ? (int) sizeof(u->error) : INT_MAX,
393                           u->error[0] ? u->error : curl_easy_strerror(code));
394                 return -EIO;
395         }
396
397         code = curl_easy_getinfo(u->easy, CURLINFO_RESPONSE_CODE, &status);
398         if (code) {
399                 log_error("Failed to retrieve response code: %s",
400                           curl_easy_strerror(code));
401                 return -EUCLEAN;
402         }
403
404         if (status >= 300) {
405                 log_error("Upload to %s failed with code %lu: %s",
406                           u->url, status, strna(u->answer));
407                 return -EIO;
408         } else if (status < 200) {
409                 log_error("Upload to %s finished with unexpected code %lu: %s",
410                           u->url, status, strna(u->answer));
411                 return -EIO;
412         } else
413                 log_debug("Upload finished successfully with code %lu: %s",
414                           status, strna(u->answer));
415
416         free(u->last_cursor);
417         u->last_cursor = u->current_cursor;
418         u->current_cursor = NULL;
419
420         return update_cursor_state(u);
421 }
422
423 static void help(void) {
424         printf("%s -u URL {FILE|-}...\n\n"
425                "Upload journal events to a remote server.\n\n"
426                "Options:\n"
427                "  --url=URL                Upload to this address\n"
428                "  --key=FILENAME           Specify key in PEM format\n"
429                "  --cert=FILENAME          Specify certificate in PEM format\n"
430                "  --trust=FILENAME         Specify CA certificate in PEM format\n"
431                "     --system              Use the system journal\n"
432                "     --user                Use the user journal for the current user\n"
433                "  -m --merge               Use  all available journals\n"
434                "  -M --machine=CONTAINER   Operate on local container\n"
435                "  -D --directory=PATH      Use journal files from directory\n"
436                "     --file=PATH           Use this journal file\n"
437                "  --cursor=CURSOR          Start at the specified cursor\n"
438                "  --after-cursor=CURSOR    Start after the specified cursor\n"
439                "  --[no-]follow            Do [not] wait for input\n"
440                "  --save-state[=FILE]      Save uploaded cursors (default \n"
441                "                           " STATE_FILE ")\n"
442                "  -h --help                Show this help and exit\n"
443                "  --version                Print version string and exit\n"
444                , program_invocation_short_name);
445 }
446
447 static int parse_argv(int argc, char *argv[]) {
448         enum {
449                 ARG_VERSION = 0x100,
450                 ARG_KEY,
451                 ARG_CERT,
452                 ARG_TRUST,
453                 ARG_USER,
454                 ARG_SYSTEM,
455                 ARG_FILE,
456                 ARG_CURSOR,
457                 ARG_AFTER_CURSOR,
458                 ARG_FOLLOW,
459                 ARG_NO_FOLLOW,
460                 ARG_SAVE_STATE,
461         };
462
463         static const struct option options[] = {
464                 { "help",         no_argument,       NULL, 'h'                },
465                 { "version",      no_argument,       NULL, ARG_VERSION        },
466                 { "url",          required_argument, NULL, 'u'                },
467                 { "key",          required_argument, NULL, ARG_KEY            },
468                 { "cert",         required_argument, NULL, ARG_CERT           },
469                 { "trust",        required_argument, NULL, ARG_TRUST          },
470                 { "system",       no_argument,       NULL, ARG_SYSTEM         },
471                 { "user",         no_argument,       NULL, ARG_USER           },
472                 { "merge",        no_argument,       NULL, 'm'                },
473                 { "machine",      required_argument, NULL, 'M'                },
474                 { "directory",    required_argument, NULL, 'D'                },
475                 { "file",         required_argument, NULL, ARG_FILE           },
476                 { "cursor",       required_argument, NULL, ARG_CURSOR         },
477                 { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR   },
478                 { "follow",       no_argument,       NULL, ARG_FOLLOW         },
479                 { "no-follow",    no_argument,       NULL, ARG_NO_FOLLOW      },
480                 { "save-state",   optional_argument, NULL, ARG_SAVE_STATE     },
481                 {}
482         };
483
484         int c, r;
485
486         assert(argc >= 0);
487         assert(argv);
488
489         opterr = 0;
490
491         while ((c = getopt_long(argc, argv, "hu:mM:D:", options, NULL)) >= 0)
492                 switch(c) {
493                 case 'h':
494                         help();
495                         return 0 /* done */;
496
497                 case ARG_VERSION:
498                         puts(PACKAGE_STRING);
499                         puts(SYSTEMD_FEATURES);
500                         return 0 /* done */;
501
502                 case 'u':
503                         if (arg_url) {
504                                 log_error("cannot use more than one --url");
505                                 return -EINVAL;
506                         }
507
508                         arg_url = optarg;
509                         break;
510
511                 case ARG_KEY:
512                         if (arg_key) {
513                                 log_error("cannot use more than one --key");
514                                 return -EINVAL;
515                         }
516
517                         arg_key = optarg;
518                         break;
519
520                 case ARG_CERT:
521                         if (arg_cert) {
522                                 log_error("cannot use more than one --cert");
523                                 return -EINVAL;
524                         }
525
526                         arg_cert = optarg;
527                         break;
528
529                 case ARG_TRUST:
530                         if (arg_trust) {
531                                 log_error("cannot use more than one --trust");
532                                 return -EINVAL;
533                         }
534
535                         arg_trust = optarg;
536                         break;
537
538                 case ARG_SYSTEM:
539                         arg_journal_type |= SD_JOURNAL_SYSTEM;
540                         break;
541
542                 case ARG_USER:
543                         arg_journal_type |= SD_JOURNAL_CURRENT_USER;
544                         break;
545
546                 case 'm':
547                         arg_merge = true;
548                         break;
549
550                 case 'M':
551                         if (arg_machine) {
552                                 log_error("cannot use more than one --machine/-M");
553                                 return -EINVAL;
554                         }
555
556                         arg_machine = optarg;
557                         break;
558
559                 case 'D':
560                         if (arg_directory) {
561                                 log_error("cannot use more than one --directory/-D");
562                                 return -EINVAL;
563                         }
564
565                         arg_directory = optarg;
566                         break;
567
568                 case ARG_FILE:
569                         r = glob_extend(&arg_file, optarg);
570                         if (r < 0) {
571                                 log_error("Failed to add paths: %s", strerror(-r));
572                                 return r;
573                         };
574                         break;
575
576                 case ARG_CURSOR:
577                         if (arg_cursor) {
578                                 log_error("cannot use more than one --cursor/--after-cursor");
579                                 return -EINVAL;
580                         }
581
582                         arg_cursor = optarg;
583                         break;
584
585                 case ARG_AFTER_CURSOR:
586                         if (arg_cursor) {
587                                 log_error("cannot use more than one --cursor/--after-cursor");
588                                 return -EINVAL;
589                         }
590
591                         arg_cursor = optarg;
592                         arg_after_cursor = true;
593                         break;
594
595                 case ARG_FOLLOW:
596                         arg_follow = true;
597                         break;
598
599                 case ARG_NO_FOLLOW:
600                         arg_follow = false;
601                         break;
602
603                 case ARG_SAVE_STATE:
604                         arg_save_state = optarg ?: STATE_FILE;
605                         break;
606
607                 case '?':
608                         log_error("Unknown option %s.", argv[optind-1]);
609                         return -EINVAL;
610
611                 case ':':
612                         log_error("Missing argument to %s.", argv[optind-1]);
613                         return -EINVAL;
614
615                 default:
616                         assert_not_reached("Unhandled option code.");
617                 }
618
619         if (!arg_url) {
620                 log_error("Required --url/-u option missing.");
621                 return -EINVAL;
622         }
623
624         if (!!arg_key != !!arg_cert) {
625                 log_error("Options --key and --cert must be used together.");
626                 return -EINVAL;
627         }
628
629         if (optind < argc && (arg_directory || arg_file || arg_machine || arg_journal_type)) {
630                 log_error("Input arguments make no sense with journal input.");
631                 return -EINVAL;
632         }
633
634         return 1;
635 }
636
637 static int open_journal(sd_journal **j) {
638         int r;
639
640         if (arg_directory)
641                 r = sd_journal_open_directory(j, arg_directory, arg_journal_type);
642         else if (arg_file)
643                 r = sd_journal_open_files(j, (const char**) arg_file, 0);
644         else if (arg_machine)
645                 r = sd_journal_open_container(j, arg_machine, 0);
646         else
647                 r = sd_journal_open(j, !arg_merge*SD_JOURNAL_LOCAL_ONLY + arg_journal_type);
648         if (r < 0)
649                 log_error("Failed to open %s: %s",
650                           arg_directory ? arg_directory : arg_file ? "files" : "journal",
651                           strerror(-r));
652         return r;
653 }
654
655 int main(int argc, char **argv) {
656         Uploader u;
657         int r;
658         bool use_journal;
659
660         log_show_color(true);
661         log_parse_environment();
662
663         r = parse_argv(argc, argv);
664         if (r <= 0)
665                 goto finish;
666
667         r = setup_uploader(&u, arg_url, arg_save_state);
668         if (r < 0)
669                 goto cleanup;
670
671         log_debug("%s running as pid "PID_FMT,
672                   program_invocation_short_name, getpid());
673
674         use_journal = optind >= argc;
675         if (use_journal) {
676                 sd_journal *j;
677                 r = open_journal(&j);
678                 if (r < 0)
679                         goto finish;
680                 r = open_journal_for_upload(&u, j,
681                                             arg_cursor ?: u.last_cursor,
682                                             arg_cursor ? arg_after_cursor : true,
683                                             !!arg_follow);
684                 if (r < 0)
685                         goto finish;
686         }
687
688         sd_notify(false,
689                   "READY=1\n"
690                   "STATUS=Processing input...");
691
692         while (true) {
693                 if (use_journal) {
694                         if (!u.journal)
695                                 break;
696
697                         r = check_journal_input(&u);
698                 } else if (u.input < 0 && !use_journal) {
699                         if (optind >= argc)
700                                 break;
701
702                         log_debug("Using %s as input.", argv[optind]);
703                         r = open_file_for_upload(&u, argv[optind++]);
704                 }
705                 if (r < 0)
706                         goto cleanup;
707
708                 r = sd_event_get_state(u.events);
709                 if (r < 0)
710                         break;
711                 if (r == SD_EVENT_FINISHED)
712                         break;
713
714                 if (u.uploading) {
715                         r = perform_upload(&u);
716                         if (r < 0)
717                                 break;
718                 }
719
720                 r = sd_event_run(u.events, u.timeout);
721                 if (r < 0) {
722                         log_error("Failed to run event loop: %s", strerror(-r));
723                         break;
724                 }
725         }
726
727 cleanup:
728         sd_notify(false, "STATUS=Shutting down...");
729         destroy_uploader(&u);
730
731 finish:
732         return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
733 }