chiark / gitweb /
469735663cf85b5b79d1e7164b34bb80d7fc2510
[elogind.git] / src / journal / journal-remote.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2012 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/prctl.h>
29 #include <sys/socket.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <unistd.h>
33 #include <getopt.h>
34
35 #include "sd-daemon.h"
36 #include "sd-event.h"
37 #include "journal-file.h"
38 #include "journald-native.h"
39 #include "socket-util.h"
40 #include "mkdir.h"
41 #include "build.h"
42 #include "macro.h"
43 #include "strv.h"
44 #include "fileio.h"
45 #include "socket-util.h"
46 #include "microhttpd-util.h"
47
48 #ifdef HAVE_GNUTLS
49 #include <gnutls/gnutls.h>
50 #endif
51
52 #include "journal-remote-parse.h"
53 #include "journal-remote-write.h"
54
55 #define REMOTE_JOURNAL_PATH "/var/log/journal/" SD_ID128_FORMAT_STR "/remote-%s.journal"
56
57 static char* arg_output = NULL;
58 static char* arg_url = NULL;
59 static char* arg_getter = NULL;
60 static char* arg_listen_raw = NULL;
61 static char* arg_listen_http = NULL;
62 static char* arg_listen_https = NULL;
63 static char** arg_files = NULL;
64 static int arg_compress = true;
65 static int arg_seal = false;
66 static int http_socket = -1, https_socket = -1;
67
68 static char *key_pem = NULL;
69 static char *cert_pem = NULL;
70 static char *trust_pem = NULL;
71
72 /**********************************************************************
73  **********************************************************************
74  **********************************************************************/
75
76 static int spawn_child(const char* child, char** argv) {
77         int fd[2];
78         pid_t parent_pid, child_pid;
79         int r;
80
81         if (pipe(fd) < 0) {
82                 log_error("Failed to create pager pipe: %m");
83                 return -errno;
84         }
85
86         parent_pid = getpid();
87
88         child_pid = fork();
89         if (child_pid < 0) {
90                 r = -errno;
91                 log_error("Failed to fork: %m");
92                 close_pipe(fd);
93                 return r;
94         }
95
96         /* In the child */
97         if (child_pid == 0) {
98                 r = dup2(fd[1], STDOUT_FILENO);
99                 if (r < 0) {
100                         log_error("Failed to dup pipe to stdout: %m");
101                         _exit(EXIT_FAILURE);
102                 }
103
104                 r = close_pipe(fd);
105                 if (r < 0)
106                         log_warning("Failed to close pipe fds: %m");
107
108                 /* Make sure the child goes away when the parent dies */
109                 if (prctl(PR_SET_PDEATHSIG, SIGTERM) < 0)
110                         _exit(EXIT_FAILURE);
111
112                 /* Check whether our parent died before we were able
113                  * to set the death signal */
114                 if (getppid() != parent_pid)
115                         _exit(EXIT_SUCCESS);
116
117                 execvp(child, argv);
118                 log_error("Failed to exec child %s: %m", child);
119                 _exit(EXIT_FAILURE);
120         }
121
122         r = close(fd[1]);
123         if (r < 0)
124                 log_warning("Failed to close write end of pipe: %m");
125
126         return fd[0];
127 }
128
129 static int spawn_curl(char* url) {
130         char **argv = STRV_MAKE("curl",
131                                 "-HAccept: application/vnd.fdo.journal",
132                                 "--silent",
133                                 "--show-error",
134                                 url);
135         int r;
136
137         r = spawn_child("curl", argv);
138         if (r < 0)
139                 log_error("Failed to spawn curl: %m");
140         return r;
141 }
142
143 static int spawn_getter(char *getter, char *url) {
144         int r;
145         char _cleanup_strv_free_ **words = NULL;
146
147         assert(getter);
148         words = strv_split_quoted(getter);
149         if (!words)
150                 return log_oom();
151
152         r = spawn_child(words[0], words);
153         if (r < 0)
154                 log_error("Failed to spawn getter %s: %m", getter);
155
156         return r;
157 }
158
159 static int open_output(Writer *s, const char* url) {
160         char _cleanup_free_ *name, *output = NULL;
161         char *c;
162         int r;
163
164         assert(url);
165         name = strdup(url);
166         if (!name)
167                 return log_oom();
168
169         for(c = name; *c; c++) {
170                 if (*c == '/' || *c == ':' || *c == ' ')
171                         *c = '~';
172                 else if (*c == '?') {
173                         *c = '\0';
174                         break;
175                 }
176         }
177
178         if (!arg_output) {
179                 sd_id128_t machine;
180                 r = sd_id128_get_machine(&machine);
181                 if (r < 0) {
182                         log_error("failed to determine machine ID128: %s", strerror(-r));
183                         return r;
184                 }
185
186                 r = asprintf(&output, REMOTE_JOURNAL_PATH,
187                              SD_ID128_FORMAT_VAL(machine), name);
188                 if (r < 0)
189                         return log_oom();
190         } else {
191                 r = is_dir(arg_output);
192                 if (r > 0) {
193                         r = asprintf(&output,
194                                      "%s/remote-%s.journal", arg_output, name);
195                         if (r < 0)
196                                 return log_oom();
197                 } else {
198                         output = strdup(arg_output);
199                         if (!output)
200                                 return log_oom();
201                 }
202         }
203
204         r = journal_file_open_reliably(output,
205                                        O_RDWR|O_CREAT, 0640,
206                                        arg_compress, arg_seal,
207                                        &s->metrics,
208                                        s->mmap,
209                                        NULL, &s->journal);
210         if (r < 0)
211                 log_error("Failed to open output journal %s: %s",
212                           arg_output, strerror(-r));
213         else
214                 log_info("Opened output file %s", s->journal->path);
215         return r;
216 }
217
218 /**********************************************************************
219  **********************************************************************
220  **********************************************************************/
221
222 typedef struct MHDDaemonWrapper {
223         uint64_t fd;
224         struct MHD_Daemon *daemon;
225
226         sd_event_source *event;
227 } MHDDaemonWrapper;
228
229 typedef struct RemoteServer {
230         RemoteSource **sources;
231         ssize_t sources_size;
232         ssize_t active;
233
234         sd_event *events;
235         sd_event_source *sigterm_event, *sigint_event, *listen_event;
236
237         Writer writer;
238
239         Hashmap *daemons;
240 } RemoteServer;
241
242 /* This should go away as soon as Âµhttpd allows state to be passed around. */
243 static RemoteServer *server;
244
245 static int dispatch_raw_source_event(sd_event_source *event,
246                                      int fd,
247                                      uint32_t revents,
248                                      void *userdata);
249 static int dispatch_raw_connection_event(sd_event_source *event,
250                                          int fd,
251                                          uint32_t revents,
252                                          void *userdata);
253 static int dispatch_http_event(sd_event_source *event,
254                                int fd,
255                                uint32_t revents,
256                                void *userdata);
257
258 static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) {
259         assert(fd >= 0);
260         assert(source);
261
262         if (!GREEDY_REALLOC0_T(s->sources, s->sources_size, fd + 1))
263                 return log_oom();
264
265         if (s->sources[fd] == NULL) {
266                 s->sources[fd] = new0(RemoteSource, 1);
267                 if (!s->sources[fd])
268                         return log_oom();
269                 s->sources[fd]->fd = -1;
270                 s->active++;
271         }
272
273         *source = s->sources[fd];
274         return 0;
275 }
276
277 static int remove_source(RemoteServer *s, int fd) {
278         RemoteSource *source;
279
280         assert(s);
281         assert(fd >= 0);
282         assert(fd < s->sources_size);
283
284         source = s->sources[fd];
285         if (source) {
286                 source_free(source);
287                 s->sources[fd] = NULL;
288                 s->active--;
289         }
290
291         close(fd);
292
293         return 0;
294 }
295
296 static int add_source(RemoteServer *s, int fd, const char* name) {
297         RemoteSource *source = NULL;
298         char *realname;
299         int r;
300
301         assert(s);
302         assert(fd >= 0);
303
304         if (name) {
305                 realname = strdup(name);
306                 if (!realname)
307                         return log_oom();
308         } else {
309                 r = asprintf(&realname, "fd:%d", fd);
310                 if (r < 0)
311                         return log_oom();
312         }
313
314         log_debug("Creating source for fd:%d (%s)", fd, name);
315
316         r = get_source_for_fd(s, fd, &source);
317         if (r < 0) {
318                 log_error("Failed to create source for fd:%d (%s)", fd, name);
319                 return r;
320         }
321         assert(source);
322         assert(source->fd < 0);
323         source->fd = fd;
324
325         r = sd_event_add_io(s->events, &source->event,
326                             fd, EPOLLIN, dispatch_raw_source_event, s);
327         if (r < 0) {
328                 log_error("Failed to register event source for fd:%d: %s",
329                           fd, strerror(-r));
330                 goto error;
331         }
332
333         return 1; /* work to do */
334
335  error:
336         remove_source(s, fd);
337         return r;
338 }
339
340 static int add_raw_socket(RemoteServer *s, int fd) {
341         int r;
342
343         r = sd_event_add_io(s->events, &s->listen_event, fd, EPOLLIN,
344                             dispatch_raw_connection_event, s);
345         if (r < 0) {
346                 close(fd);
347                 return r;
348         }
349
350         s->active ++;
351         return 0;
352 }
353
354 static int setup_raw_socket(RemoteServer *s, const char *address) {
355         int fd;
356
357         fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
358         if (fd < 0)
359                 return fd;
360
361         return add_raw_socket(s, fd);
362 }
363
364 /**********************************************************************
365  **********************************************************************
366  **********************************************************************/
367
368 static RemoteSource *request_meta(void **connection_cls) {
369         RemoteSource *source;
370
371         assert(connection_cls);
372         if (*connection_cls)
373                 return *connection_cls;
374
375         source = new0(RemoteSource, 1);
376         if (!source)
377                 return NULL;
378         source->fd = -1;
379
380         log_debug("Added RemoteSource as connection metadata %p", source);
381
382         *connection_cls = source;
383         return source;
384 }
385
386 static void request_meta_free(void *cls,
387                               struct MHD_Connection *connection,
388                               void **connection_cls,
389                               enum MHD_RequestTerminationCode toe) {
390         RemoteSource *s;
391
392         assert(connection_cls);
393         s = *connection_cls;
394
395         log_debug("Cleaning up connection metadata %p", s);
396         source_free(s);
397         *connection_cls = NULL;
398 }
399
400 static int process_http_upload(
401                 struct MHD_Connection *connection,
402                 const char *upload_data,
403                 size_t *upload_data_size,
404                 RemoteSource *source) {
405
406         bool finished = false;
407         int r;
408
409         assert(source);
410
411         log_debug("request_handler_upload: connection %p, %zu bytes",
412                   connection, *upload_data_size);
413
414         if (*upload_data_size) {
415                 log_info("Received %zu bytes", *upload_data_size);
416
417                 r = push_data(source, upload_data, *upload_data_size);
418                 if (r < 0) {
419                         log_error("Failed to store received data of size %zu: %s",
420                                   *upload_data_size, strerror(-r));
421                         return mhd_respond_oom(connection);
422                 }
423                 *upload_data_size = 0;
424         } else
425                 finished = true;
426
427         while (true) {
428                 r = process_source(source, &server->writer, arg_compress, arg_seal);
429                 if (r == -E2BIG)
430                         log_warning("Entry too big, skipped");
431                 else if (r == -EAGAIN || r == -EWOULDBLOCK)
432                         break;
433                 else if (r < 0) {
434                         log_warning("Failed to process data for connection %p", connection);
435                         return mhd_respondf(connection, MHD_HTTP_UNPROCESSABLE_ENTITY,
436                                             "Processing failed: %s", strerror(-r));
437                 }
438         }
439
440         if (!finished)
441                 return MHD_YES;
442
443         /* The upload is finished */
444
445         if (source_non_empty(source)) {
446                 log_warning("EOF reached with incomplete data");
447                 return mhd_respond(connection, MHD_HTTP_EXPECTATION_FAILED,
448                                    "Trailing data not processed.");
449         }
450
451         return mhd_respond(connection, MHD_HTTP_ACCEPTED, "OK.\n");
452 };
453
454 static int request_handler(
455                 void *cls,
456                 struct MHD_Connection *connection,
457                 const char *url,
458                 const char *method,
459                 const char *version,
460                 const char *upload_data,
461                 size_t *upload_data_size,
462                 void **connection_cls) {
463
464         const char *header;
465         int r ,code;
466
467         assert(connection);
468         assert(connection_cls);
469         assert(url);
470         assert(method);
471
472         log_debug("Handling a connection %s %s %s", method, url, version);
473
474         if (*connection_cls)
475                 return process_http_upload(connection,
476                                            upload_data, upload_data_size,
477                                            *connection_cls);
478
479         if (!streq(method, "POST"))
480                 return mhd_respond(connection, MHD_HTTP_METHOD_NOT_ACCEPTABLE,
481                                    "Unsupported method.\n");
482
483         if (!streq(url, "/upload"))
484                 return mhd_respond(connection, MHD_HTTP_NOT_FOUND,
485                                    "Not found.\n");
486
487         header = MHD_lookup_connection_value(connection,
488                                              MHD_HEADER_KIND, "Content-Type");
489         if (!header || !streq(header, "application/vnd.fdo.journal"))
490                 return mhd_respond(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
491                                    "Content-Type: application/vnd.fdo.journal"
492                                    " is required.\n");
493
494         if (trust_pem) {
495                 r = check_permissions(connection, &code);
496                 if (r < 0)
497                         return code;
498         }
499
500         if (!request_meta(connection_cls))
501                 return respond_oom(connection);
502         return MHD_YES;
503 }
504
505 static int setup_microhttpd_server(RemoteServer *s, int fd, bool https) {
506         struct MHD_OptionItem opts[] = {
507                 { MHD_OPTION_NOTIFY_COMPLETED, (intptr_t) request_meta_free},
508                 { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t) microhttpd_logger},
509                 { MHD_OPTION_LISTEN_SOCKET, fd},
510                 { MHD_OPTION_END},
511                 { MHD_OPTION_END},
512                 { MHD_OPTION_END},
513                 { MHD_OPTION_END}};
514         int opts_pos = 3;
515         int flags =
516                 MHD_USE_DEBUG |
517                 MHD_USE_PEDANTIC_CHECKS |
518                 MHD_USE_EPOLL_LINUX_ONLY |
519                 MHD_USE_DUAL_STACK;
520
521         const union MHD_DaemonInfo *info;
522         int r, epoll_fd;
523         MHDDaemonWrapper *d;
524
525         assert(fd >= 0);
526
527         r = fd_nonblock(fd, true);
528         if (r < 0) {
529                 log_error("Failed to make fd:%d nonblocking: %s", fd, strerror(-r));
530                 return r;
531         }
532
533         if (https) {
534                 opts[opts_pos++] = (struct MHD_OptionItem)
535                         {MHD_OPTION_HTTPS_MEM_KEY, 0, key_pem};
536                 opts[opts_pos++] = (struct MHD_OptionItem)
537                         {MHD_OPTION_HTTPS_MEM_CERT, 0, cert_pem};
538
539                 flags |= MHD_USE_SSL;
540
541                 if (trust_pem)
542                         opts[opts_pos++] = (struct MHD_OptionItem)
543                                 {MHD_OPTION_HTTPS_MEM_TRUST, 0, trust_pem};
544         }
545
546         d = new(MHDDaemonWrapper, 1);
547         if (!d)
548                 return log_oom();
549
550         d->fd = (uint64_t) fd;
551
552         d->daemon = MHD_start_daemon(flags, 0,
553                                      NULL, NULL,
554                                      request_handler, NULL,
555                                      MHD_OPTION_ARRAY, opts,
556                                      MHD_OPTION_END);
557         if (!d->daemon) {
558                 log_error("Failed to start Âµhttp daemon");
559                 r = -EINVAL;
560                 goto error;
561         }
562
563         log_debug("Started MHD %s daemon on fd:%d (wrapper @ %p)",
564                   https ? "HTTPS" : "HTTP", fd, d);
565
566
567         info = MHD_get_daemon_info(d->daemon, MHD_DAEMON_INFO_EPOLL_FD_LINUX_ONLY);
568         if (!info) {
569                 log_error("µhttp returned NULL daemon info");
570                 r = -ENOTSUP;
571                 goto error;
572         }
573
574         epoll_fd = info->listen_fd;
575         if (epoll_fd < 0) {
576                 log_error("µhttp epoll fd is invalid");
577                 r = -EUCLEAN;
578                 goto error;
579         }
580
581         r = sd_event_add_io(s->events, &d->event,
582                             epoll_fd, EPOLLIN, dispatch_http_event, d);
583         if (r < 0) {
584                 log_error("Failed to add event callback: %s", strerror(-r));
585                 goto error;
586         }
587
588         r = hashmap_ensure_allocated(&s->daemons, uint64_hash_func, uint64_compare_func);
589         if (r < 0) {
590                 log_oom();
591                 goto error;
592         }
593
594         r = hashmap_put(s->daemons, &d->fd, d);
595         if (r < 0) {
596                 log_error("Failed to add daemon to hashmap: %s", strerror(-r));
597                 goto error;
598         }
599
600         s->active ++;
601         return 0;
602
603 error:
604         MHD_stop_daemon(d->daemon);
605         free(d->daemon);
606         free(d);
607         return r;
608 }
609
610 static int setup_microhttpd_socket(RemoteServer *s,
611                                    const char *address,
612                                    bool https) {
613         int fd;
614
615         fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
616         if (fd < 0)
617                 return fd;
618
619         return setup_microhttpd_server(s, fd, https);
620 }
621
622 static int dispatch_http_event(sd_event_source *event,
623                                int fd,
624                                uint32_t revents,
625                                void *userdata) {
626         MHDDaemonWrapper *d = userdata;
627         int r;
628
629         assert(d);
630
631         log_info("%s", __func__);
632
633         r = MHD_run(d->daemon);
634         if (r == MHD_NO) {
635                 log_error("MHD_run failed!");
636                 // XXX: unregister daemon
637                 return -EINVAL;
638         }
639
640         return 1; /* work to do */
641 }
642
643 /**********************************************************************
644  **********************************************************************
645  **********************************************************************/
646
647 static int dispatch_sigterm(sd_event_source *event,
648                             const struct signalfd_siginfo *si,
649                             void *userdata) {
650         RemoteServer *s = userdata;
651
652         assert(s);
653
654         log_received_signal(LOG_INFO, si);
655
656         sd_event_exit(s->events, 0);
657         return 0;
658 }
659
660 static int setup_signals(RemoteServer *s) {
661         sigset_t mask;
662         int r;
663
664         assert(s);
665
666         assert_se(sigemptyset(&mask) == 0);
667         sigset_add_many(&mask, SIGINT, SIGTERM, -1);
668         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
669
670         r = sd_event_add_signal(s->events, &s->sigterm_event, SIGTERM, dispatch_sigterm, s);
671         if (r < 0)
672                 return r;
673
674         r = sd_event_add_signal(s->events, &s->sigint_event, SIGINT, dispatch_sigterm, s);
675         if (r < 0)
676                 return r;
677
678         return 0;
679 }
680
681 static int fd_fd(const char *spec) {
682         int fd, r;
683
684         r = safe_atoi(spec, &fd);
685         if (r < 0)
686                 return r;
687
688         if (fd >= 0)
689                 return -ENOENT;
690
691         return -fd;
692 }
693
694
695 static int remoteserver_init(RemoteServer *s) {
696         int r, n, fd;
697         const char *output_name = NULL;
698         char **file;
699
700         assert(s);
701
702         sd_event_default(&s->events);
703
704         setup_signals(s);
705
706         assert(server == NULL);
707         server = s;
708
709         n = sd_listen_fds(true);
710         if (n < 0) {
711                 log_error("Failed to read listening file descriptors from environment: %s",
712                           strerror(-n));
713                 return n;
714         } else
715                 log_info("Received %d descriptors", n);
716
717         if (MAX(http_socket, https_socket) >= SD_LISTEN_FDS_START + n) {
718                 log_error("Received fewer sockets than expected");
719                 return -EBADFD;
720         }
721
722         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
723                 if (sd_is_socket(fd, AF_UNSPEC, 0, false)) {
724                         log_info("Received a listening socket (fd:%d)", fd);
725
726                         if (fd == http_socket)
727                                 r = setup_microhttpd_server(s, fd, false);
728                         else if (fd == https_socket)
729                                 r = setup_microhttpd_server(s, fd, true);
730                         else
731                                 r = add_raw_socket(s, fd);
732                 } else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) {
733                         log_info("Received a connection socket (fd:%d)", fd);
734
735                         r = add_source(s, fd, NULL);
736                 } else {
737                         log_error("Unknown socket passed on fd:%d", fd);
738
739                         return -EINVAL;
740                 }
741
742                 if(r < 0) {
743                         log_error("Failed to register socket (fd:%d): %s",
744                                   fd, strerror(-r));
745                         return r;
746                 }
747
748                 output_name = "socket";
749         }
750
751         if (arg_url) {
752                 char _cleanup_free_ *url = NULL;
753                 char _cleanup_strv_free_ **urlv = strv_new(arg_url, "/entries", NULL);
754                 if (!urlv)
755                         return log_oom();
756                 url = strv_join(urlv, "");
757                 if (!url)
758                         return log_oom();
759
760                 if (arg_getter) {
761                         log_info("Spawning getter %s...", url);
762                         fd = spawn_getter(arg_getter, url);
763                 } else {
764                         log_info("Spawning curl %s...", url);
765                         fd = spawn_curl(url);
766                 }
767                 if (fd < 0)
768                         return fd;
769
770                 r = add_source(s, fd, arg_url);
771                 if (r < 0)
772                         return r;
773
774                 output_name = arg_url;
775         }
776
777         if (arg_listen_raw) {
778                 log_info("Listening on a socket...");
779                 r = setup_raw_socket(s, arg_listen_raw);
780                 if (r < 0)
781                         return r;
782
783                 output_name = arg_listen_raw;
784         }
785
786         if (arg_listen_http) {
787                 r = setup_microhttpd_socket(s, arg_listen_http, false);
788                 if (r < 0)
789                         return r;
790
791                 output_name = arg_listen_http;
792         }
793
794         if (arg_listen_https) {
795                 r = setup_microhttpd_socket(s, arg_listen_https, true);
796                 if (r < 0)
797                         return r;
798
799                 output_name = arg_listen_https;
800         }
801
802         STRV_FOREACH(file, arg_files) {
803                 if (streq(*file, "-")) {
804                         log_info("Reading standard input...");
805
806                         fd = STDIN_FILENO;
807                         output_name = "stdin";
808                 } else {
809                         log_info("Reading file %s...", *file);
810
811                         fd = open(*file, O_RDONLY|O_CLOEXEC|O_NOCTTY|O_NONBLOCK);
812                         if (fd < 0) {
813                                 log_error("Failed to open %s: %m", *file);
814                                 return -errno;
815                         }
816                         output_name = *file;
817                 }
818
819                 r = add_source(s, fd, output_name);
820                 if (r < 0)
821                         return r;
822         }
823
824         if (s->active == 0) {
825                 log_error("Zarro sources specified");
826                 return -EINVAL;
827         }
828
829         if (!!n + !!arg_url + !!arg_listen_raw + !!arg_files)
830                 output_name = "multiple";
831
832         r = writer_init(&s->writer);
833         if (r < 0)
834                 return r;
835
836         r = open_output(&s->writer, output_name);
837         return r;
838 }
839
840 static int server_destroy(RemoteServer *s) {
841         int r;
842         ssize_t i;
843         MHDDaemonWrapper *d;
844
845         r = writer_close(&s->writer);
846
847         while ((d = hashmap_steal_first(s->daemons))) {
848                 MHD_stop_daemon(d->daemon);
849                 sd_event_source_unref(d->event);
850                 free(d);
851         }
852
853         hashmap_free(s->daemons);
854
855         assert(s->sources_size == 0 || s->sources);
856         for (i = 0; i < s->sources_size; i++)
857                 remove_source(s, i);
858
859         free(s->sources);
860
861         sd_event_source_unref(s->sigterm_event);
862         sd_event_source_unref(s->sigint_event);
863         sd_event_source_unref(s->listen_event);
864         sd_event_unref(s->events);
865
866         /* fds that we're listening on remain open... */
867
868         return r;
869 }
870
871 /**********************************************************************
872  **********************************************************************
873  **********************************************************************/
874
875 static int dispatch_raw_source_event(sd_event_source *event,
876                                      int fd,
877                                      uint32_t revents,
878                                      void *userdata) {
879
880         RemoteServer *s = userdata;
881         RemoteSource *source;
882         int r;
883
884         assert(fd < s->sources_size);
885         source = s->sources[fd];
886         assert(source->fd == fd);
887
888         r = process_source(source, &s->writer, arg_compress, arg_seal);
889         if (source->state == STATE_EOF) {
890                 log_info("EOF reached with source fd:%d (%s)",
891                          source->fd, source->name);
892                 if (source_non_empty(source))
893                         log_warning("EOF reached with incomplete data");
894                 remove_source(s, source->fd);
895                 log_info("%zd active source remaining", s->active);
896         } else if (r == -E2BIG) {
897                 log_error("Entry too big, skipped");
898                 r = 1;
899         }
900
901         return r;
902 }
903
904 static int accept_connection(const char* type, int fd, SocketAddress *addr) {
905         int fd2, r;
906
907         log_debug("Accepting new %s connection on fd:%d", type, fd);
908         fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
909         if (fd2 < 0) {
910                 log_error("accept() on fd:%d failed: %m", fd);
911                 return -errno;
912         }
913
914         switch(socket_address_family(addr)) {
915         case AF_INET:
916         case AF_INET6: {
917                 char* _cleanup_free_ a = NULL;
918
919                 r = socket_address_print(addr, &a);
920                 if (r < 0) {
921                         log_error("socket_address_print(): %s", strerror(-r));
922                         close(fd2);
923                         return r;
924                 }
925
926                 log_info("Accepted %s %s connection from %s",
927                          type,
928                          socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
929                          a);
930
931                 return fd2;
932         };
933         default:
934                 log_error("Rejected %s connection with unsupported family %d",
935                           type, socket_address_family(addr));
936                 close(fd2);
937
938                 return -EINVAL;
939         }
940 }
941
942 static int dispatch_raw_connection_event(sd_event_source *event,
943                                          int fd,
944                                          uint32_t revents,
945                                          void *userdata) {
946         RemoteServer *s = userdata;
947         int fd2;
948         SocketAddress addr = {
949                 .size = sizeof(union sockaddr_union),
950                 .type = SOCK_STREAM,
951         };
952
953         fd2 = accept_connection("raw", fd, &addr);
954         if (fd2 < 0)
955                 return fd2;
956
957         return add_source(s, fd2, NULL);
958 }
959
960 /**********************************************************************
961  **********************************************************************
962  **********************************************************************/
963
964 static int help(void) {
965         printf("%s [OPTIONS...] {FILE|-}...\n\n"
966                "Write external journal events to a journal file.\n\n"
967                "Options:\n"
968                "  --url=URL            Read events from systemd-journal-gatewayd at URL\n"
969                "  --getter=COMMAND     Read events from the output of COMMAND\n"
970                "  --listen-raw=ADDR    Listen for connections at ADDR\n"
971                "  --listen-http=ADDR   Listen for HTTP connections at ADDR\n"
972                "  --listen-https=ADDR  Listen for HTTPS connections at ADDR\n"
973                "  -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n"
974                "  --[no-]compress      Use XZ-compression in the output journal (default: yes)\n"
975                "  --[no-]seal          Use Event sealing in the output journal (default: no)\n"
976                "  -h --help            Show this help and exit\n"
977                "  --version            Print version string and exit\n"
978                "\n"
979                "Note: file descriptors from sd_listen_fds() will be consumed, too.\n"
980                , program_invocation_short_name);
981
982         return 0;
983 }
984
985 static int parse_argv(int argc, char *argv[]) {
986         enum {
987                 ARG_VERSION = 0x100,
988                 ARG_URL,
989                 ARG_LISTEN_RAW,
990                 ARG_LISTEN_HTTP,
991                 ARG_LISTEN_HTTPS,
992                 ARG_GETTER,
993                 ARG_COMPRESS,
994                 ARG_NO_COMPRESS,
995                 ARG_SEAL,
996                 ARG_NO_SEAL,
997                 ARG_KEY,
998                 ARG_CERT,
999                 ARG_TRUST,
1000         };
1001
1002         static const struct option options[] = {
1003                 { "help",         no_argument,       NULL, 'h'              },
1004                 { "version",      no_argument,       NULL, ARG_VERSION      },
1005                 { "url",          required_argument, NULL, ARG_URL          },
1006                 { "getter",       required_argument, NULL, ARG_GETTER       },
1007                 { "listen-raw",   required_argument, NULL, ARG_LISTEN_RAW   },
1008                 { "listen-http",  required_argument, NULL, ARG_LISTEN_HTTP  },
1009                 { "listen-https", required_argument, NULL, ARG_LISTEN_HTTPS },
1010                 { "output",       required_argument, NULL, 'o'              },
1011                 { "compress",     no_argument,       NULL, ARG_COMPRESS     },
1012                 { "no-compress",  no_argument,       NULL, ARG_NO_COMPRESS  },
1013                 { "seal",         no_argument,       NULL, ARG_SEAL         },
1014                 { "no-seal",      no_argument,       NULL, ARG_NO_SEAL      },
1015                 { "key",          required_argument, NULL, ARG_KEY          },
1016                 { "cert",         required_argument, NULL, ARG_CERT         },
1017                 { "trust",        required_argument, NULL, ARG_TRUST        },
1018                 {}
1019         };
1020
1021         int c, r;
1022
1023         assert(argc >= 0);
1024         assert(argv);
1025
1026         while ((c = getopt_long(argc, argv, "ho:", options, NULL)) >= 0)
1027                 switch(c) {
1028                 case 'h':
1029                         help();
1030                         return 0 /* done */;
1031
1032                 case ARG_VERSION:
1033                         puts(PACKAGE_STRING);
1034                         puts(SYSTEMD_FEATURES);
1035                         return 0 /* done */;
1036
1037                 case ARG_URL:
1038                         if (arg_url) {
1039                                 log_error("cannot currently set more than one --url");
1040                                 return -EINVAL;
1041                         }
1042
1043                         arg_url = optarg;
1044                         break;
1045
1046                 case ARG_GETTER:
1047                         if (arg_getter) {
1048                                 log_error("cannot currently use --getter more than once");
1049                                 return -EINVAL;
1050                         }
1051
1052                         arg_getter = optarg;
1053                         break;
1054
1055                 case ARG_LISTEN_RAW:
1056                         if (arg_listen_raw) {
1057                                 log_error("cannot currently use --listen-raw more than once");
1058                                 return -EINVAL;
1059                         }
1060
1061                         arg_listen_raw = optarg;
1062                         break;
1063
1064                 case ARG_LISTEN_HTTP:
1065                         if (arg_listen_http || http_socket >= 0) {
1066                                 log_error("cannot currently use --listen-http more than once");
1067                                 return -EINVAL;
1068                         }
1069
1070                         r = fd_fd(optarg);
1071                         if (r >= 0)
1072                                 http_socket = r;
1073                         else if (r == -ENOENT)
1074                                 arg_listen_http = optarg;
1075                         else {
1076                                 log_error("Invalid port/fd specification %s: %s",
1077                                           optarg, strerror(-r));
1078                                 return -EINVAL;
1079                         }
1080
1081                         break;
1082
1083                 case ARG_LISTEN_HTTPS:
1084                         if (arg_listen_https || https_socket >= 0) {
1085                                 log_error("cannot currently use --listen-https more than once");
1086                                 return -EINVAL;
1087                         }
1088
1089                         r = fd_fd(optarg);
1090                         if (r >= 0)
1091                                 https_socket = r;
1092                         else if (r == -ENOENT)
1093                                 arg_listen_https = optarg;
1094                         else {
1095                                 log_error("Invalid port/fd specification %s: %s",
1096                                           optarg, strerror(-r));
1097                                 return -EINVAL;
1098                         }
1099
1100                         break;
1101
1102                 case ARG_KEY:
1103                         if (key_pem) {
1104                                 log_error("Key file specified twice");
1105                                 return -EINVAL;
1106                         }
1107                         r = read_full_file(optarg, &key_pem, NULL);
1108                         if (r < 0) {
1109                                 log_error("Failed to read key file: %s", strerror(-r));
1110                                 return r;
1111                         }
1112                         assert(key_pem);
1113                         break;
1114
1115                 case ARG_CERT:
1116                         if (cert_pem) {
1117                                 log_error("Certificate file specified twice");
1118                                 return -EINVAL;
1119                         }
1120                         r = read_full_file(optarg, &cert_pem, NULL);
1121                         if (r < 0) {
1122                                 log_error("Failed to read certificate file: %s", strerror(-r));
1123                                 return r;
1124                         }
1125                         assert(cert_pem);
1126                         break;
1127
1128                 case ARG_TRUST:
1129 #ifdef HAVE_GNUTLS
1130                         if (trust_pem) {
1131                                 log_error("CA certificate file specified twice");
1132                                 return -EINVAL;
1133                         }
1134                         r = read_full_file(optarg, &trust_pem, NULL);
1135                         if (r < 0) {
1136                                 log_error("Failed to read CA certificate file: %s", strerror(-r));
1137                                 return r;
1138                         }
1139                         assert(trust_pem);
1140                         break;
1141 #else
1142                         log_error("Option --trust is not available.");
1143 #endif
1144
1145                 case 'o':
1146                         if (arg_output) {
1147                                 log_error("cannot use --output/-o more than once");
1148                                 return -EINVAL;
1149                         }
1150
1151                         arg_output = optarg;
1152                         break;
1153
1154                 case ARG_COMPRESS:
1155                         arg_compress = true;
1156                         break;
1157                 case ARG_NO_COMPRESS:
1158                         arg_compress = false;
1159                         break;
1160                 case ARG_SEAL:
1161                         arg_seal = true;
1162                         break;
1163                 case ARG_NO_SEAL:
1164                         arg_seal = false;
1165                         break;
1166
1167                 case '?':
1168                         return -EINVAL;
1169
1170                 default:
1171                         log_error("Unknown option code %c", c);
1172                         return -EINVAL;
1173                 }
1174
1175         if (arg_listen_https && !(key_pem && cert_pem)) {
1176                 log_error("Options --key and --cert must be used when https sources are specified");
1177                 return -EINVAL;
1178         }
1179
1180         if (optind < argc)
1181                 arg_files = argv + optind;
1182
1183         return 1 /* work to do */;
1184 }
1185
1186 static int setup_gnutls_logger(void) {
1187         if (!arg_listen_http && !arg_listen_https)
1188                 return 0;
1189
1190 #ifdef HAVE_GNUTLS
1191         gnutls_global_set_log_function(log_func_gnutls);
1192         gnutls_global_set_log_level(GNUTLS_LOG_LEVEL);
1193 #endif
1194
1195         return 0;
1196 }
1197
1198 int main(int argc, char **argv) {
1199         RemoteServer s = {};
1200         int r, r2;
1201
1202         log_set_max_level(LOG_DEBUG);
1203         log_show_color(true);
1204         log_parse_environment();
1205
1206         r = parse_argv(argc, argv);
1207         if (r <= 0)
1208                 return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
1209
1210         r = setup_gnutls_logger();
1211         if (r < 0)
1212                 return EXIT_FAILURE;
1213
1214         if (remoteserver_init(&s) < 0)
1215                 return EXIT_FAILURE;
1216
1217         log_debug("%s running as pid %lu",
1218                   program_invocation_short_name, (unsigned long) getpid());
1219         sd_notify(false,
1220                   "READY=1\n"
1221                   "STATUS=Processing requests...");
1222
1223         while (s.active) {
1224                 r = sd_event_get_state(s.events);
1225                 if (r < 0)
1226                         break;
1227                 if (r == SD_EVENT_FINISHED)
1228                         break;
1229
1230                 r = sd_event_run(s.events, -1);
1231                 if (r < 0) {
1232                         log_error("Failed to run event loop: %s", strerror(-r));
1233                         break;
1234                 }
1235         }
1236
1237         log_info("Finishing after writing %" PRIu64 " entries", s.writer.seqnum);
1238         r2 = server_destroy(&s);
1239
1240         sd_notify(false, "STATUS=Shutting down...");
1241
1242         return r >= 0 && r2 >= 0 ? EXIT_SUCCESS : EXIT_FAILURE;
1243 }