chiark / gitweb /
journal-remote: HTTP(s) support
[elogind.git] / src / journal / journal-remote.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2012 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/prctl.h>
29 #include <sys/socket.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <unistd.h>
33 #include <getopt.h>
34
35 #include "sd-daemon.h"
36 #include "sd-event.h"
37 #include "journal-file.h"
38 #include "journald-native.h"
39 #include "socket-util.h"
40 #include "mkdir.h"
41 #include "build.h"
42 #include "macro.h"
43 #include "strv.h"
44 #include "fileio.h"
45 #include "socket-util.h"
46 #include "microhttpd-util.h"
47
48 #ifdef HAVE_GNUTLS
49 #include <gnutls/gnutls.h>
50 #endif
51
52 #include "journal-remote-parse.h"
53 #include "journal-remote-write.h"
54
55 #define REMOTE_JOURNAL_PATH "/var/log/journal/" SD_ID128_FORMAT_STR "/remote-%s.journal"
56
57 static char* arg_output = NULL;
58 static char* arg_url = NULL;
59 static char* arg_getter = NULL;
60 static bool arg_stdin = false;
61 static char* arg_listen_raw = NULL;
62 static char* arg_listen_http = NULL;
63 static char* arg_listen_https = NULL;
64 static int arg_compress = true;
65 static int arg_seal = false;
66
67 static char *key_pem = NULL;
68 static char *cert_pem = NULL;
69 static char *trust_pem = NULL;
70
71 /**********************************************************************
72  **********************************************************************
73  **********************************************************************/
74
75 static int spawn_child(const char* child, char** argv) {
76         int fd[2];
77         pid_t parent_pid, child_pid;
78         int r;
79
80         if (pipe(fd) < 0) {
81                 log_error("Failed to create pager pipe: %m");
82                 return -errno;
83         }
84
85         parent_pid = getpid();
86
87         child_pid = fork();
88         if (child_pid < 0) {
89                 r = -errno;
90                 log_error("Failed to fork: %m");
91                 close_pipe(fd);
92                 return r;
93         }
94
95         /* In the child */
96         if (child_pid == 0) {
97                 r = dup2(fd[1], STDOUT_FILENO);
98                 if (r < 0) {
99                         log_error("Failed to dup pipe to stdout: %m");
100                         _exit(EXIT_FAILURE);
101                 }
102
103                 r = close_pipe(fd);
104                 if (r < 0)
105                         log_warning("Failed to close pipe fds: %m");
106
107                 /* Make sure the child goes away when the parent dies */
108                 if (prctl(PR_SET_PDEATHSIG, SIGTERM) < 0)
109                         _exit(EXIT_FAILURE);
110
111                 /* Check whether our parent died before we were able
112                  * to set the death signal */
113                 if (getppid() != parent_pid)
114                         _exit(EXIT_SUCCESS);
115
116                 execvp(child, argv);
117                 log_error("Failed to exec child %s: %m", child);
118                 _exit(EXIT_FAILURE);
119         }
120
121         r = close(fd[1]);
122         if (r < 0)
123                 log_warning("Failed to close write end of pipe: %m");
124
125         return fd[0];
126 }
127
128 static int spawn_curl(char* url) {
129         char **argv = STRV_MAKE("curl",
130                                 "-HAccept: application/vnd.fdo.journal",
131                                 "--silent",
132                                 "--show-error",
133                                 url);
134         int r;
135
136         r = spawn_child("curl", argv);
137         if (r < 0)
138                 log_error("Failed to spawn curl: %m");
139         return r;
140 }
141
142 static int spawn_getter(char *getter, char *url) {
143         int r;
144         char _cleanup_strv_free_ **words = NULL, **words2 = NULL;
145
146         assert(getter);
147         words = strv_split_quoted(getter);
148         if (!words)
149                 return log_oom();
150
151         r = spawn_child(words[0], words);
152         if (r < 0)
153                 log_error("Failed to spawn getter %s: %m", getter);
154
155         return r;
156 }
157
158 static int open_output(Writer *s, const char* url) {
159         char _cleanup_free_ *name, *output = NULL;
160         char *c;
161         int r;
162
163         assert(url);
164         name = strdup(url);
165         if (!name)
166                 return log_oom();
167
168         for(c = name; *c; c++) {
169                 if (*c == '/' || *c == ':' || *c == ' ')
170                         *c = '~';
171                 else if (*c == '?') {
172                         *c = '\0';
173                         break;
174                 }
175         }
176
177         if (!arg_output) {
178                 sd_id128_t machine;
179                 r = sd_id128_get_machine(&machine);
180                 if (r < 0) {
181                         log_error("failed to determine machine ID128: %s", strerror(-r));
182                         return r;
183                 }
184
185                 r = asprintf(&output, REMOTE_JOURNAL_PATH,
186                              SD_ID128_FORMAT_VAL(machine), name);
187                 if (r < 0)
188                         return log_oom();
189         } else {
190                 r = is_dir(arg_output);
191                 if (r > 0) {
192                         r = asprintf(&output,
193                                      "%s/remote-%s.journal", arg_output, name);
194                         if (r < 0)
195                                 return log_oom();
196                 } else {
197                         output = strdup(arg_output);
198                         if (!output)
199                                 return log_oom();
200                 }
201         }
202
203         r = journal_file_open_reliably(output,
204                                        O_RDWR|O_CREAT, 0640,
205                                        arg_compress, arg_seal,
206                                        &s->metrics,
207                                        s->mmap,
208                                        NULL, &s->journal);
209         if (r < 0)
210                 log_error("Failed to open output journal %s: %s",
211                           arg_output, strerror(-r));
212         else
213                 log_info("Opened output file %s", s->journal->path);
214         return r;
215 }
216
217 /**********************************************************************
218  **********************************************************************
219  **********************************************************************/
220
221 typedef struct MHDDaemonWrapper {
222         uint64_t fd;
223         struct MHD_Daemon *daemon;
224
225         sd_event_source *event;
226 } MHDDaemonWrapper;
227
228 typedef struct RemoteServer {
229         RemoteSource **sources;
230         ssize_t sources_size;
231         ssize_t active;
232
233         sd_event *events;
234         sd_event_source *sigterm_event, *sigint_event, *listen_event;
235
236         Writer writer;
237
238         Hashmap *daemons;
239 } RemoteServer;
240
241 /* This should go away as soon as Âµhttpd allows state to be passed around. */
242 static RemoteServer *server;
243
244 static int dispatch_raw_source_event(sd_event_source *event,
245                                      int fd,
246                                      uint32_t revents,
247                                      void *userdata);
248 static int dispatch_raw_connection_event(sd_event_source *event,
249                                          int fd,
250                                          uint32_t revents,
251                                          void *userdata);
252 static int dispatch_http_event(sd_event_source *event,
253                                int fd,
254                                uint32_t revents,
255                                void *userdata);
256
257 static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) {
258         assert(fd >= 0);
259         assert(source);
260
261         if (!GREEDY_REALLOC0_T(s->sources, s->sources_size, fd + 1))
262                 return log_oom();
263
264         if (s->sources[fd] == NULL) {
265                 s->sources[fd] = new0(RemoteSource, 1);
266                 if (!s->sources[fd])
267                         return log_oom();
268                 s->sources[fd]->fd = -1;
269                 s->active++;
270         }
271
272         *source = s->sources[fd];
273         return 0;
274 }
275
276 static int remove_source(RemoteServer *s, int fd) {
277         RemoteSource *source;
278
279         assert(s);
280         assert(fd >= 0);
281         assert(fd < s->sources_size);
282
283         source = s->sources[fd];
284         if (source) {
285                 source_free(source);
286                 s->sources[fd] = NULL;
287                 s->active--;
288         }
289
290         close(fd);
291
292         return 0;
293 }
294
295 static int add_source(RemoteServer *s, int fd, const char* name) {
296         RemoteSource *source = NULL;
297         char *realname;
298         int r;
299
300         assert(s);
301         assert(fd >= 0);
302
303         if (name) {
304                 realname = strdup(name);
305                 if (!realname)
306                         return log_oom();
307         } else {
308                 r = asprintf(&realname, "fd:%d", fd);
309                 if (r < 0)
310                         return log_oom();
311         }
312
313         log_debug("Creating source for fd:%d (%s)", fd, name);
314
315         r = get_source_for_fd(s, fd, &source);
316         if (r < 0) {
317                 log_error("Failed to create source for fd:%d (%s)", fd, name);
318                 return r;
319         }
320         assert(source);
321         assert(source->fd < 0);
322         source->fd = fd;
323
324         r = sd_event_add_io(s->events, &source->event,
325                             fd, EPOLLIN, dispatch_raw_source_event, s);
326         if (r < 0) {
327                 log_error("Failed to register event source for fd:%d: %s",
328                           fd, strerror(-r));
329                 goto error;
330         }
331
332         return 1; /* work to do */
333
334  error:
335         remove_source(s, fd);
336         return r;
337 }
338
339 static int setup_raw_socket(RemoteServer *s, const char *address) {
340         int fd, r;
341
342         fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
343         if (fd < 0)
344                 return fd;
345
346         r = sd_event_add_io(s->events, &s->listen_event, fd, EPOLLIN,
347                             dispatch_raw_connection_event, s);
348         if (r < 0) {
349                 close(fd);
350                 return r;
351         }
352
353         s->active ++;
354         return 0;
355 }
356
357 /**********************************************************************
358  **********************************************************************
359  **********************************************************************/
360
361 static RemoteSource *request_meta(void **connection_cls) {
362         RemoteSource *source;
363
364         assert(connection_cls);
365         if (*connection_cls)
366                 return *connection_cls;
367
368         source = new0(RemoteSource, 1);
369         if (!source)
370                 return NULL;
371         source->fd = -1;
372
373         log_debug("Added RemoteSource as connection metadata %p", source);
374
375         *connection_cls = source;
376         return source;
377 }
378
379 static void request_meta_free(void *cls,
380                               struct MHD_Connection *connection,
381                               void **connection_cls,
382                               enum MHD_RequestTerminationCode toe) {
383         RemoteSource *s;
384
385         assert(connection_cls);
386         s = *connection_cls;
387
388         log_debug("Cleaning up connection metadata %p", s);
389         source_free(s);
390         *connection_cls = NULL;
391 }
392
393 static int process_http_upload(
394                 struct MHD_Connection *connection,
395                 const char *upload_data,
396                 size_t *upload_data_size,
397                 RemoteSource *source) {
398
399         bool finished = false;
400         int r;
401
402         assert(source);
403
404         log_debug("request_handler_upload: connection %p, %zu bytes",
405                   connection, *upload_data_size);
406
407         if (*upload_data_size) {
408                 log_info("Received %zu bytes", *upload_data_size);
409
410                 r = push_data(source, upload_data, *upload_data_size);
411                 if (r < 0) {
412                         log_error("Failed to store received data of size %zu: %s",
413                                   *upload_data_size, strerror(-r));
414                         return respond_oom_internal(connection);
415                 }
416                 *upload_data_size = 0;
417         } else
418                 finished = true;
419
420         while (true) {
421                 r = process_source(source, &server->writer, arg_compress, arg_seal);
422                 if (r == -E2BIG)
423                         log_warning("Entry too big, skipped");
424                 else if (r == -EAGAIN || r == -EWOULDBLOCK)
425                         break;
426                 else if (r < 0) {
427                         log_warning("Failed to process data for connection %p", connection);
428                         return respond_error(connection, MHD_HTTP_UNPROCESSABLE_ENTITY,
429                                              "Processing failed: %s", strerror(-r));
430                 }
431         }
432
433         if (!finished)
434                 return MHD_YES;
435
436         /* The upload is finished */
437
438         if (source_non_empty(source)) {
439                 log_warning("EOF reached with incomplete data");
440                 return respond_error(connection, MHD_HTTP_EXPECTATION_FAILED,
441                                      "Trailing data not processed.");
442         }
443
444         return respond_error(connection, MHD_HTTP_ACCEPTED, "OK.\n");
445 };
446
447 static int request_handler(
448                 void *cls,
449                 struct MHD_Connection *connection,
450                 const char *url,
451                 const char *method,
452                 const char *version,
453                 const char *upload_data,
454                 size_t *upload_data_size,
455                 void **connection_cls) {
456
457         const char *header;
458         int r ,code;
459
460         assert(connection);
461         assert(connection_cls);
462         assert(url);
463         assert(method);
464
465         log_debug("Handling a connection %s %s %s", method, url, version);
466
467         if (*connection_cls)
468                 return process_http_upload(connection,
469                                            upload_data, upload_data_size,
470                                            *connection_cls);
471
472         if (!streq(method, "POST"))
473                 return respond_error(connection, MHD_HTTP_METHOD_NOT_ACCEPTABLE,
474                                      "Unsupported method.\n");
475
476         if (!streq(url, "/upload"))
477                 return respond_error(connection, MHD_HTTP_NOT_FOUND,
478                                      "Not found.\n");
479
480         header = MHD_lookup_connection_value(connection,
481                                              MHD_HEADER_KIND, "Content-Type");
482         if (!header || !streq(header, "application/vnd.fdo.journal"))
483                 return respond_error(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
484                                      "Content-Type: application/vnd.fdo.journal"
485                                      " is required.\n");
486
487         if (trust_pem) {
488                 r = check_permissions(connection, &code);
489                 if (r < 0)
490                         return code;
491         }
492
493         if (!request_meta(connection_cls))
494                 return respond_oom(connection);
495         return MHD_YES;
496 }
497
498 static int setup_microhttpd_server(RemoteServer *s, int fd, bool https) {
499         struct MHD_OptionItem opts[] = {
500                 { MHD_OPTION_NOTIFY_COMPLETED, (intptr_t) request_meta_free},
501                 { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t) microhttpd_logger},
502                 { MHD_OPTION_LISTEN_SOCKET, fd},
503                 { MHD_OPTION_END},
504                 { MHD_OPTION_END},
505                 { MHD_OPTION_END},
506                 { MHD_OPTION_END}};
507         int opts_pos = 3;
508         int flags =
509                 MHD_USE_DEBUG |
510                 MHD_USE_PEDANTIC_CHECKS |
511                 MHD_USE_EPOLL_LINUX_ONLY |
512                 MHD_USE_DUAL_STACK;
513
514         const union MHD_DaemonInfo *info;
515         int r, epoll_fd;
516         MHDDaemonWrapper *d;
517
518         assert(fd >= 0);
519
520         r = fd_nonblock(fd, true);
521         if (r < 0) {
522                 log_error("Failed to make fd:%d nonblocking: %s", fd, strerror(-r));
523                 return r;
524         }
525
526         if (https) {
527                 opts[opts_pos++] = (struct MHD_OptionItem)
528                         {MHD_OPTION_HTTPS_MEM_KEY, 0, key_pem};
529                 opts[opts_pos++] = (struct MHD_OptionItem)
530                         {MHD_OPTION_HTTPS_MEM_CERT, 0, cert_pem};
531
532                 flags |= MHD_USE_SSL;
533
534                 if (trust_pem)
535                         opts[opts_pos++] = (struct MHD_OptionItem)
536                                 {MHD_OPTION_HTTPS_MEM_TRUST, 0, trust_pem};
537         }
538
539         d = new(MHDDaemonWrapper, 1);
540         if (!d)
541                 return log_oom();
542
543         d->fd = (uint64_t) fd;
544
545         d->daemon = MHD_start_daemon(flags, 0,
546                                      NULL, NULL,
547                                      request_handler, NULL,
548                                      MHD_OPTION_ARRAY, opts,
549                                      MHD_OPTION_END);
550         if (!d->daemon) {
551                 log_error("Failed to start Âµhttp daemon");
552                 r = -EINVAL;
553                 goto error;
554         }
555
556         log_debug("Started MHD %s daemon on fd:%d (wrapper @ %p)",
557                   https ? "HTTPS" : "HTTP", fd, d);
558
559
560         info = MHD_get_daemon_info(d->daemon, MHD_DAEMON_INFO_EPOLL_FD_LINUX_ONLY);
561         if (!info) {
562                 log_error("µhttp returned NULL daemon info");
563                 r = -ENOTSUP;
564                 goto error;
565         }
566
567         epoll_fd = info->listen_fd;
568         if (epoll_fd < 0) {
569                 log_error("µhttp epoll fd is invalid");
570                 r = -EUCLEAN;
571                 goto error;
572         }
573
574         r = sd_event_add_io(s->events, &d->event,
575                             epoll_fd, EPOLLIN, dispatch_http_event, d);
576         if (r < 0) {
577                 log_error("Failed to add event callback: %s", strerror(-r));
578                 goto error;
579         }
580
581         r = hashmap_ensure_allocated(&s->daemons, uint64_hash_func, uint64_compare_func);
582         if (r < 0) {
583                 log_oom();
584                 goto error;
585         }
586
587         r = hashmap_put(s->daemons, &d->fd, d);
588         if (r < 0) {
589                 log_error("Failed to add daemon to hashmap: %s", strerror(-r));
590                 goto error;
591         }
592
593         s->active ++;
594         return 0;
595
596 error:
597         MHD_stop_daemon(d->daemon);
598         free(d->daemon);
599         free(d);
600         return r;
601 }
602
603 static int setup_microhttpd_socket(RemoteServer *s,
604                                    const char *address,
605                                    bool https) {
606         int fd;
607
608         fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
609         if (fd < 0)
610                 return fd;
611
612         return setup_microhttpd_server(s, fd, https);
613 }
614
615 static int dispatch_http_event(sd_event_source *event,
616                                int fd,
617                                uint32_t revents,
618                                void *userdata) {
619         MHDDaemonWrapper *d = userdata;
620         int r;
621
622         assert(d);
623
624         log_info("%s", __func__);
625
626         r = MHD_run(d->daemon);
627         if (r == MHD_NO) {
628                 log_error("MHD_run failed!");
629                 // XXX: unregister daemon
630                 return -EINVAL;
631         }
632
633         return 1; /* work to do */
634 }
635
636 /**********************************************************************
637  **********************************************************************
638  **********************************************************************/
639
640 static int dispatch_sigterm(sd_event_source *event,
641                             const struct signalfd_siginfo *si,
642                             void *userdata) {
643         RemoteServer *s = userdata;
644
645         assert(s);
646
647         log_received_signal(LOG_INFO, si);
648
649         sd_event_exit(s->events, 0);
650         return 0;
651 }
652
653 static int setup_signals(RemoteServer *s) {
654         sigset_t mask;
655         int r;
656
657         assert(s);
658
659         assert_se(sigemptyset(&mask) == 0);
660         sigset_add_many(&mask, SIGINT, SIGTERM, -1);
661         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
662
663         r = sd_event_add_signal(s->events, &s->sigterm_event, SIGTERM, dispatch_sigterm, s);
664         if (r < 0)
665                 return r;
666
667         r = sd_event_add_signal(s->events, &s->sigint_event, SIGINT, dispatch_sigterm, s);
668         if (r < 0)
669                 return r;
670
671         return 0;
672 }
673
674 static int remoteserver_init(RemoteServer *s) {
675         int r, n, fd;
676         const char *output_name = NULL;
677
678         assert(s);
679
680         sd_event_default(&s->events);
681
682         setup_signals(s);
683
684         assert(server == NULL);
685         server = s;
686
687         n = sd_listen_fds(true);
688         if (n < 0) {
689                 log_error("Failed to read listening file descriptors from environment: %s",
690                           strerror(-n));
691                 return n;
692         } else
693                 log_info("Received %d descriptors", n);
694
695         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
696                 if (sd_is_socket(fd, AF_UNSPEC, 0, false)) {
697                         assert_not_reached("not implemented");
698                 } else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) {
699                         log_info("Received a connection socket (fd:%d)", fd);
700
701                         r = add_source(s, fd, NULL);
702                         output_name = "socket";
703                 } else {
704                         log_error("Unknown socket passed on fd:%d", fd);
705                         return -EINVAL;
706                 }
707         }
708
709         if (arg_url) {
710                 char _cleanup_free_ *url = NULL;
711                 char _cleanup_strv_free_ **urlv = strv_new(arg_url, "/entries", NULL);
712                 if (!urlv)
713                         return log_oom();
714                 url = strv_join(urlv, "");
715                 if (!url)
716                         return log_oom();
717
718                 if (arg_getter) {
719                         log_info("Spawning getter %s...", url);
720                         fd = spawn_getter(arg_getter, url);
721                 } else {
722                         log_info("Spawning curl %s...", url);
723                         fd = spawn_curl(url);
724                 }
725                 if (fd < 0)
726                         return fd;
727
728                 r = add_source(s, fd, arg_url);
729                 if (r < 0)
730                         return r;
731
732                 output_name = arg_url;
733         }
734
735         if (arg_listen_raw) {
736                 log_info("Listening on a socket...");
737                 r = setup_raw_socket(s, arg_listen_raw);
738                 if (r < 0)
739                         return r;
740
741                 output_name = arg_listen_raw;
742         }
743
744         if (arg_listen_http) {
745                 r = setup_microhttpd_socket(s, arg_listen_http, false);
746                 if (r < 0)
747                         return r;
748
749                 output_name = arg_listen_http;
750         }
751
752         if (arg_listen_https) {
753                 r = setup_microhttpd_socket(s, arg_listen_https, true);
754                 if (r < 0)
755                         return r;
756
757                 output_name = arg_listen_https;
758         }
759
760         if (arg_stdin) {
761                 log_info("Reading standard input...");
762                 r = add_source(s, STDIN_FILENO, "stdin");
763                 if (r < 0)
764                         return r;
765
766                 output_name = "stdin";
767         }
768
769         if (s->active == 0) {
770                 log_error("Zarro sources specified");
771                 return -EINVAL;
772         }
773
774         if (!!n + !!arg_url + !!arg_listen_raw + !!arg_stdin > 1)
775                 output_name = "multiple";
776
777         r = writer_init(&s->writer);
778         if (r < 0)
779                 return r;
780
781         r = open_output(&s->writer, output_name);
782         return r;
783 }
784
785 static int server_destroy(RemoteServer *s) {
786         int r;
787         ssize_t i;
788         MHDDaemonWrapper *d;
789
790         r = writer_close(&s->writer);
791
792         while ((d = hashmap_steal_first(s->daemons))) {
793                 MHD_stop_daemon(d->daemon);
794                 sd_event_source_unref(d->event);
795                 free(d);
796         }
797
798         hashmap_free(s->daemons);
799
800         assert(s->sources_size == 0 || s->sources);
801         for (i = 0; i < s->sources_size; i++)
802                 remove_source(s, i);
803
804         free(s->sources);
805
806         sd_event_source_unref(s->sigterm_event);
807         sd_event_source_unref(s->sigint_event);
808         sd_event_source_unref(s->listen_event);
809         sd_event_unref(s->events);
810
811         /* fds that we're listening on remain open... */
812
813         return r;
814 }
815
816 /**********************************************************************
817  **********************************************************************
818  **********************************************************************/
819
820 static int dispatch_raw_source_event(sd_event_source *event,
821                                      int fd,
822                                      uint32_t revents,
823                                      void *userdata) {
824
825         RemoteServer *s = userdata;
826         RemoteSource *source;
827         int r;
828
829         assert(fd < s->sources_size);
830         source = s->sources[fd];
831         assert(source->fd == fd);
832
833         r = process_source(source, &s->writer, arg_compress, arg_seal);
834         if (source->state == STATE_EOF) {
835                 log_info("EOF reached with source fd:%d (%s)",
836                          source->fd, source->name);
837                 if (source_non_empty(source))
838                         log_warning("EOF reached with incomplete data");
839                 remove_source(s, source->fd);
840                 log_info("%zd active source remaining", s->active);
841         } else if (r == -E2BIG) {
842                 log_error("Entry too big, skipped");
843                 r = 1;
844         }
845
846         return r;
847 }
848
849 static int accept_connection(const char* type, int fd, SocketAddress *addr) {
850         int fd2, r;
851
852         log_debug("Accepting new %s connection on fd:%d", type, fd);
853         fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
854         if (fd2 < 0) {
855                 log_error("accept() on fd:%d failed: %m", fd);
856                 return -errno;
857         }
858
859         switch(socket_address_family(addr)) {
860         case AF_INET:
861         case AF_INET6: {
862                 char* _cleanup_free_ a = NULL;
863
864                 r = socket_address_print(addr, &a);
865                 if (r < 0) {
866                         log_error("socket_address_print(): %s", strerror(-r));
867                         close(fd2);
868                         return r;
869                 }
870
871                 log_info("Accepted %s %s connection from %s",
872                          type,
873                          socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
874                          a);
875
876                 return fd2;
877         };
878         default:
879                 log_error("Rejected %s connection with unsupported family %d",
880                           type, socket_address_family(addr));
881                 close(fd2);
882
883                 return -EINVAL;
884         }
885 }
886
887 static int dispatch_raw_connection_event(sd_event_source *event,
888                                          int fd,
889                                          uint32_t revents,
890                                          void *userdata) {
891         RemoteServer *s = userdata;
892         int fd2;
893         SocketAddress addr = {
894                 .size = sizeof(union sockaddr_union),
895                 .type = SOCK_STREAM,
896         };
897
898         fd2 = accept_connection("raw", fd, &addr);
899         if (fd2 < 0)
900                 return fd2;
901
902         return add_source(s, fd2, NULL);
903 }
904
905 /**********************************************************************
906  **********************************************************************
907  **********************************************************************/
908
909 static int help(void) {
910         printf("%s [OPTIONS...]\n\n"
911                "Write external journal events to a journal file.\n\n"
912                "Options:\n"
913                "  --url=URL            Read events from systemd-journal-gatewayd at URL\n"
914                "  --getter=COMMAND     Read events from the output of COMMAND\n"
915                "  --listen-raw=ADDR    Listen for connections at ADDR\n"
916                "  --listen-http=ADDR   Listen for HTTP connections at ADDR\n"
917                "  --listen-https=ADDR  Listen for HTTPS connections at ADDR\n"
918                "  --stdin              Read events from standard input\n"
919                "  -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n"
920                "  --[no-]compress      Use XZ-compression in the output journal (default: yes)\n"
921                "  --[no-]seal          Use Event sealing in the output journal (default: no)\n"
922                "  -h --help            Show this help and exit\n"
923                "  --version            Print version string and exit\n"
924                "\n"
925                "Note: file descriptors from sd_listen_fds() will be consumed, too.\n"
926                , program_invocation_short_name);
927
928         return 0;
929 }
930
931 static int parse_argv(int argc, char *argv[]) {
932         enum {
933                 ARG_VERSION = 0x100,
934                 ARG_URL,
935                 ARG_LISTEN_RAW,
936                 ARG_LISTEN_HTTP,
937                 ARG_LISTEN_HTTPS,
938                 ARG_STDIN,
939                 ARG_GETTER,
940                 ARG_COMPRESS,
941                 ARG_NO_COMPRESS,
942                 ARG_SEAL,
943                 ARG_NO_SEAL,
944                 ARG_KEY,
945                 ARG_CERT,
946                 ARG_TRUST,
947         };
948
949         static const struct option options[] = {
950                 { "help",         no_argument,       NULL, 'h'              },
951                 { "version",      no_argument,       NULL, ARG_VERSION      },
952                 { "url",          required_argument, NULL, ARG_URL          },
953                 { "getter",       required_argument, NULL, ARG_GETTER       },
954                 { "listen-raw",   required_argument, NULL, ARG_LISTEN_RAW   },
955                 { "listen-http",  required_argument, NULL, ARG_LISTEN_HTTP  },
956                 { "listen-https", required_argument, NULL, ARG_LISTEN_HTTPS },
957                 { "stdin",        no_argument,       NULL, ARG_STDIN        },
958                 { "output",       required_argument, NULL, 'o'              },
959                 { "compress",     no_argument,       NULL, ARG_COMPRESS     },
960                 { "no-compress",  no_argument,       NULL, ARG_NO_COMPRESS  },
961                 { "seal",         no_argument,       NULL, ARG_SEAL         },
962                 { "no-seal",      no_argument,       NULL, ARG_NO_SEAL      },
963                 { "key",          required_argument, NULL, ARG_KEY          },
964                 { "cert",         required_argument, NULL, ARG_CERT         },
965                 { "trust",        required_argument, NULL, ARG_TRUST        },
966                 {}
967         };
968
969         int c, r;
970
971         assert(argc >= 0);
972         assert(argv);
973
974         while ((c = getopt_long(argc, argv, "ho:", options, NULL)) >= 0)
975                 switch(c) {
976                 case 'h':
977                         help();
978                         return 0 /* done */;
979
980                 case ARG_VERSION:
981                         puts(PACKAGE_STRING);
982                         puts(SYSTEMD_FEATURES);
983                         return 0 /* done */;
984
985                 case ARG_URL:
986                         if (arg_url) {
987                                 log_error("cannot currently set more than one --url");
988                                 return -EINVAL;
989                         }
990
991                         arg_url = optarg;
992                         break;
993
994                 case ARG_GETTER:
995                         if (arg_getter) {
996                                 log_error("cannot currently use --getter more than once");
997                                 return -EINVAL;
998                         }
999
1000                         arg_getter = optarg;
1001                         break;
1002
1003                 case ARG_LISTEN_RAW:
1004                         if (arg_listen_raw) {
1005                                 log_error("cannot currently use --listen-raw more than once");
1006                                 return -EINVAL;
1007                         }
1008
1009                         arg_listen_raw = optarg;
1010                         break;
1011
1012                 case ARG_LISTEN_HTTP:
1013                         if (arg_listen_http) {
1014                                 log_error("cannot currently use --listen-http more than once");
1015                                 return -EINVAL;
1016                         }
1017
1018                         arg_listen_http = optarg;
1019                         break;
1020
1021                 case ARG_LISTEN_HTTPS:
1022                         if (arg_listen_https) {
1023                                 log_error("cannot currently use --listen-https more than once");
1024                                 return -EINVAL;
1025                         }
1026
1027                         arg_listen_https = optarg;
1028                         break;
1029
1030                 case ARG_KEY:
1031                         if (key_pem) {
1032                                 log_error("Key file specified twice");
1033                                 return -EINVAL;
1034                         }
1035                         r = read_full_file(optarg, &key_pem, NULL);
1036                         if (r < 0) {
1037                                 log_error("Failed to read key file: %s", strerror(-r));
1038                                 return r;
1039                         }
1040                         assert(key_pem);
1041                         break;
1042
1043                 case ARG_CERT:
1044                         if (cert_pem) {
1045                                 log_error("Certificate file specified twice");
1046                                 return -EINVAL;
1047                         }
1048                         r = read_full_file(optarg, &cert_pem, NULL);
1049                         if (r < 0) {
1050                                 log_error("Failed to read certificate file: %s", strerror(-r));
1051                                 return r;
1052                         }
1053                         assert(cert_pem);
1054                         break;
1055
1056                 case ARG_TRUST:
1057 #ifdef HAVE_GNUTLS
1058                         if (trust_pem) {
1059                                 log_error("CA certificate file specified twice");
1060                                 return -EINVAL;
1061                         }
1062                         r = read_full_file(optarg, &trust_pem, NULL);
1063                         if (r < 0) {
1064                                 log_error("Failed to read CA certificate file: %s", strerror(-r));
1065                                 return r;
1066                         }
1067                         assert(trust_pem);
1068                         break;
1069 #else
1070                         log_error("Option --trust is not available.");
1071 #endif
1072
1073                 case ARG_STDIN:
1074                         arg_stdin = true;
1075                         break;
1076
1077                 case 'o':
1078                         if (arg_output) {
1079                                 log_error("cannot use --output/-o more than once");
1080                                 return -EINVAL;
1081                         }
1082
1083                         arg_output = optarg;
1084                         break;
1085
1086                 case ARG_COMPRESS:
1087                         arg_compress = true;
1088                         break;
1089                 case ARG_NO_COMPRESS:
1090                         arg_compress = false;
1091                         break;
1092                 case ARG_SEAL:
1093                         arg_seal = true;
1094                         break;
1095                 case ARG_NO_SEAL:
1096                         arg_seal = false;
1097                         break;
1098
1099                 case '?':
1100                         return -EINVAL;
1101
1102                 default:
1103                         log_error("Unknown option code %c", c);
1104                         return -EINVAL;
1105                 }
1106
1107         if (arg_listen_https && !(key_pem && cert_pem)) {
1108                 log_error("Options --key and --cert must be used when https sources are specified");
1109                 return -EINVAL;
1110         }
1111
1112         if (optind < argc) {
1113                 log_error("This program takes no positional arguments");
1114                 return -EINVAL;
1115         }
1116
1117         return 1 /* work to do */;
1118 }
1119
1120 static int setup_gnutls_logger(void) {
1121         if (!arg_listen_http && !arg_listen_https)
1122                 return 0;
1123
1124 #ifdef HAVE_GNUTLS
1125         gnutls_global_set_log_function(log_func_gnutls);
1126         gnutls_global_set_log_level(GNUTLS_LOG_LEVEL);
1127 #endif
1128
1129         return 0;
1130 }
1131
1132 int main(int argc, char **argv) {
1133         RemoteServer s = {};
1134         int r, r2;
1135
1136         log_set_max_level(LOG_DEBUG);
1137         log_show_color(true);
1138         log_parse_environment();
1139
1140         r = parse_argv(argc, argv);
1141         if (r <= 0)
1142                 return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
1143
1144         r = setup_gnutls_logger();
1145         if (r < 0)
1146                 return EXIT_FAILURE;
1147
1148         if (remoteserver_init(&s) < 0)
1149                 return EXIT_FAILURE;
1150
1151         log_debug("%s running as pid %lu",
1152                   program_invocation_short_name, (unsigned long) getpid());
1153         sd_notify(false,
1154                   "READY=1\n"
1155                   "STATUS=Processing requests...");
1156
1157         while (s.active) {
1158                 r = sd_event_get_state(s.events);
1159                 if (r < 0)
1160                         break;
1161                 if (r == SD_EVENT_FINISHED)
1162                         break;
1163
1164                 r = sd_event_run(s.events, -1);
1165                 if (r < 0) {
1166                         log_error("Failed to run event loop: %s", strerror(-r));
1167                         break;
1168                 }
1169         }
1170
1171         log_info("Finishing after writing %" PRIu64 " entries", s.writer.seqnum);
1172         r2 = server_destroy(&s);
1173
1174         sd_notify(false, "STATUS=Shutting down...");
1175
1176         return r >= 0 && r2 >= 0 ? EXIT_SUCCESS : EXIT_FAILURE;
1177 }