chiark / gitweb /
util: replace close_pipe() with new safe_close_pair()
[elogind.git] / src / journal / journal-remote.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2012 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/prctl.h>
29 #include <sys/socket.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <unistd.h>
33 #include <getopt.h>
34
35 #include "sd-daemon.h"
36 #include "sd-event.h"
37 #include "journal-file.h"
38 #include "journald-native.h"
39 #include "socket-util.h"
40 #include "mkdir.h"
41 #include "build.h"
42 #include "macro.h"
43 #include "strv.h"
44 #include "fileio.h"
45 #include "socket-util.h"
46 #include "microhttpd-util.h"
47
48 #ifdef HAVE_GNUTLS
49 #include <gnutls/gnutls.h>
50 #endif
51
52 #include "journal-remote-parse.h"
53 #include "journal-remote-write.h"
54
55 #define REMOTE_JOURNAL_PATH "/var/log/journal/" SD_ID128_FORMAT_STR "/remote-%s.journal"
56
57 static char* arg_output = NULL;
58 static char* arg_url = NULL;
59 static char* arg_getter = NULL;
60 static char* arg_listen_raw = NULL;
61 static char* arg_listen_http = NULL;
62 static char* arg_listen_https = NULL;
63 static char** arg_files = NULL;
64 static int arg_compress = true;
65 static int arg_seal = false;
66 static int http_socket = -1, https_socket = -1;
67
68 static char *key_pem = NULL;
69 static char *cert_pem = NULL;
70 static char *trust_pem = NULL;
71
72 /**********************************************************************
73  **********************************************************************
74  **********************************************************************/
75
76 static int spawn_child(const char* child, char** argv) {
77         int fd[2];
78         pid_t parent_pid, child_pid;
79         int r;
80
81         if (pipe(fd) < 0) {
82                 log_error("Failed to create pager pipe: %m");
83                 return -errno;
84         }
85
86         parent_pid = getpid();
87
88         child_pid = fork();
89         if (child_pid < 0) {
90                 r = -errno;
91                 log_error("Failed to fork: %m");
92                 safe_close_pair(fd);
93                 return r;
94         }
95
96         /* In the child */
97         if (child_pid == 0) {
98                 r = dup2(fd[1], STDOUT_FILENO);
99                 if (r < 0) {
100                         log_error("Failed to dup pipe to stdout: %m");
101                         _exit(EXIT_FAILURE);
102                 }
103
104                 safe_close_pair(fd);
105
106                 /* Make sure the child goes away when the parent dies */
107                 if (prctl(PR_SET_PDEATHSIG, SIGTERM) < 0)
108                         _exit(EXIT_FAILURE);
109
110                 /* Check whether our parent died before we were able
111                  * to set the death signal */
112                 if (getppid() != parent_pid)
113                         _exit(EXIT_SUCCESS);
114
115                 execvp(child, argv);
116                 log_error("Failed to exec child %s: %m", child);
117                 _exit(EXIT_FAILURE);
118         }
119
120         r = close(fd[1]);
121         if (r < 0)
122                 log_warning("Failed to close write end of pipe: %m");
123
124         return fd[0];
125 }
126
127 static int spawn_curl(char* url) {
128         char **argv = STRV_MAKE("curl",
129                                 "-HAccept: application/vnd.fdo.journal",
130                                 "--silent",
131                                 "--show-error",
132                                 url);
133         int r;
134
135         r = spawn_child("curl", argv);
136         if (r < 0)
137                 log_error("Failed to spawn curl: %m");
138         return r;
139 }
140
141 static int spawn_getter(char *getter, char *url) {
142         int r;
143         char _cleanup_strv_free_ **words = NULL;
144
145         assert(getter);
146         words = strv_split_quoted(getter);
147         if (!words)
148                 return log_oom();
149
150         r = spawn_child(words[0], words);
151         if (r < 0)
152                 log_error("Failed to spawn getter %s: %m", getter);
153
154         return r;
155 }
156
157 static int open_output(Writer *s, const char* url) {
158         char _cleanup_free_ *name, *output = NULL;
159         char *c;
160         int r;
161
162         assert(url);
163         name = strdup(url);
164         if (!name)
165                 return log_oom();
166
167         for(c = name; *c; c++) {
168                 if (*c == '/' || *c == ':' || *c == ' ')
169                         *c = '~';
170                 else if (*c == '?') {
171                         *c = '\0';
172                         break;
173                 }
174         }
175
176         if (!arg_output) {
177                 sd_id128_t machine;
178                 r = sd_id128_get_machine(&machine);
179                 if (r < 0) {
180                         log_error("failed to determine machine ID128: %s", strerror(-r));
181                         return r;
182                 }
183
184                 r = asprintf(&output, REMOTE_JOURNAL_PATH,
185                              SD_ID128_FORMAT_VAL(machine), name);
186                 if (r < 0)
187                         return log_oom();
188         } else {
189                 r = is_dir(arg_output);
190                 if (r > 0) {
191                         r = asprintf(&output,
192                                      "%s/remote-%s.journal", arg_output, name);
193                         if (r < 0)
194                                 return log_oom();
195                 } else {
196                         output = strdup(arg_output);
197                         if (!output)
198                                 return log_oom();
199                 }
200         }
201
202         r = journal_file_open_reliably(output,
203                                        O_RDWR|O_CREAT, 0640,
204                                        arg_compress, arg_seal,
205                                        &s->metrics,
206                                        s->mmap,
207                                        NULL, &s->journal);
208         if (r < 0)
209                 log_error("Failed to open output journal %s: %s",
210                           arg_output, strerror(-r));
211         else
212                 log_info("Opened output file %s", s->journal->path);
213         return r;
214 }
215
216 /**********************************************************************
217  **********************************************************************
218  **********************************************************************/
219
220 typedef struct MHDDaemonWrapper {
221         uint64_t fd;
222         struct MHD_Daemon *daemon;
223
224         sd_event_source *event;
225 } MHDDaemonWrapper;
226
227 typedef struct RemoteServer {
228         RemoteSource **sources;
229         ssize_t sources_size;
230         ssize_t active;
231
232         sd_event *events;
233         sd_event_source *sigterm_event, *sigint_event, *listen_event;
234
235         Writer writer;
236
237         Hashmap *daemons;
238 } RemoteServer;
239
240 /* This should go away as soon as Âµhttpd allows state to be passed around. */
241 static RemoteServer *server;
242
243 static int dispatch_raw_source_event(sd_event_source *event,
244                                      int fd,
245                                      uint32_t revents,
246                                      void *userdata);
247 static int dispatch_raw_connection_event(sd_event_source *event,
248                                          int fd,
249                                          uint32_t revents,
250                                          void *userdata);
251 static int dispatch_http_event(sd_event_source *event,
252                                int fd,
253                                uint32_t revents,
254                                void *userdata);
255
256 static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) {
257         assert(fd >= 0);
258         assert(source);
259
260         if (!GREEDY_REALLOC0_T(s->sources, s->sources_size, fd + 1))
261                 return log_oom();
262
263         if (s->sources[fd] == NULL) {
264                 s->sources[fd] = new0(RemoteSource, 1);
265                 if (!s->sources[fd])
266                         return log_oom();
267                 s->sources[fd]->fd = -1;
268                 s->active++;
269         }
270
271         *source = s->sources[fd];
272         return 0;
273 }
274
275 static int remove_source(RemoteServer *s, int fd) {
276         RemoteSource *source;
277
278         assert(s);
279         assert(fd >= 0);
280         assert(fd < s->sources_size);
281
282         source = s->sources[fd];
283         if (source) {
284                 source_free(source);
285                 s->sources[fd] = NULL;
286                 s->active--;
287         }
288
289         close(fd);
290
291         return 0;
292 }
293
294 static int add_source(RemoteServer *s, int fd, const char* name) {
295         RemoteSource *source = NULL;
296         char *realname;
297         int r;
298
299         assert(s);
300         assert(fd >= 0);
301
302         if (name) {
303                 realname = strdup(name);
304                 if (!realname)
305                         return log_oom();
306         } else {
307                 r = asprintf(&realname, "fd:%d", fd);
308                 if (r < 0)
309                         return log_oom();
310         }
311
312         log_debug("Creating source for fd:%d (%s)", fd, name);
313
314         r = get_source_for_fd(s, fd, &source);
315         if (r < 0) {
316                 log_error("Failed to create source for fd:%d (%s)", fd, name);
317                 return r;
318         }
319         assert(source);
320         assert(source->fd < 0);
321         source->fd = fd;
322
323         r = sd_event_add_io(s->events, &source->event,
324                             fd, EPOLLIN, dispatch_raw_source_event, s);
325         if (r < 0) {
326                 log_error("Failed to register event source for fd:%d: %s",
327                           fd, strerror(-r));
328                 goto error;
329         }
330
331         return 1; /* work to do */
332
333  error:
334         remove_source(s, fd);
335         return r;
336 }
337
338 static int add_raw_socket(RemoteServer *s, int fd) {
339         int r;
340
341         r = sd_event_add_io(s->events, &s->listen_event, fd, EPOLLIN,
342                             dispatch_raw_connection_event, s);
343         if (r < 0) {
344                 close(fd);
345                 return r;
346         }
347
348         s->active ++;
349         return 0;
350 }
351
352 static int setup_raw_socket(RemoteServer *s, const char *address) {
353         int fd;
354
355         fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
356         if (fd < 0)
357                 return fd;
358
359         return add_raw_socket(s, fd);
360 }
361
362 /**********************************************************************
363  **********************************************************************
364  **********************************************************************/
365
366 static RemoteSource *request_meta(void **connection_cls) {
367         RemoteSource *source;
368
369         assert(connection_cls);
370         if (*connection_cls)
371                 return *connection_cls;
372
373         source = new0(RemoteSource, 1);
374         if (!source)
375                 return NULL;
376         source->fd = -1;
377
378         log_debug("Added RemoteSource as connection metadata %p", source);
379
380         *connection_cls = source;
381         return source;
382 }
383
384 static void request_meta_free(void *cls,
385                               struct MHD_Connection *connection,
386                               void **connection_cls,
387                               enum MHD_RequestTerminationCode toe) {
388         RemoteSource *s;
389
390         assert(connection_cls);
391         s = *connection_cls;
392
393         log_debug("Cleaning up connection metadata %p", s);
394         source_free(s);
395         *connection_cls = NULL;
396 }
397
398 static int process_http_upload(
399                 struct MHD_Connection *connection,
400                 const char *upload_data,
401                 size_t *upload_data_size,
402                 RemoteSource *source) {
403
404         bool finished = false;
405         int r;
406
407         assert(source);
408
409         log_debug("request_handler_upload: connection %p, %zu bytes",
410                   connection, *upload_data_size);
411
412         if (*upload_data_size) {
413                 log_info("Received %zu bytes", *upload_data_size);
414
415                 r = push_data(source, upload_data, *upload_data_size);
416                 if (r < 0) {
417                         log_error("Failed to store received data of size %zu: %s",
418                                   *upload_data_size, strerror(-r));
419                         return mhd_respond_oom(connection);
420                 }
421                 *upload_data_size = 0;
422         } else
423                 finished = true;
424
425         while (true) {
426                 r = process_source(source, &server->writer, arg_compress, arg_seal);
427                 if (r == -E2BIG)
428                         log_warning("Entry too big, skipped");
429                 else if (r == -EAGAIN || r == -EWOULDBLOCK)
430                         break;
431                 else if (r < 0) {
432                         log_warning("Failed to process data for connection %p", connection);
433                         return mhd_respondf(connection, MHD_HTTP_UNPROCESSABLE_ENTITY,
434                                             "Processing failed: %s", strerror(-r));
435                 }
436         }
437
438         if (!finished)
439                 return MHD_YES;
440
441         /* The upload is finished */
442
443         if (source_non_empty(source)) {
444                 log_warning("EOF reached with incomplete data");
445                 return mhd_respond(connection, MHD_HTTP_EXPECTATION_FAILED,
446                                    "Trailing data not processed.");
447         }
448
449         return mhd_respond(connection, MHD_HTTP_ACCEPTED, "OK.\n");
450 };
451
452 static int request_handler(
453                 void *cls,
454                 struct MHD_Connection *connection,
455                 const char *url,
456                 const char *method,
457                 const char *version,
458                 const char *upload_data,
459                 size_t *upload_data_size,
460                 void **connection_cls) {
461
462         const char *header;
463         int r ,code;
464
465         assert(connection);
466         assert(connection_cls);
467         assert(url);
468         assert(method);
469
470         log_debug("Handling a connection %s %s %s", method, url, version);
471
472         if (*connection_cls)
473                 return process_http_upload(connection,
474                                            upload_data, upload_data_size,
475                                            *connection_cls);
476
477         if (!streq(method, "POST"))
478                 return mhd_respond(connection, MHD_HTTP_METHOD_NOT_ACCEPTABLE,
479                                    "Unsupported method.\n");
480
481         if (!streq(url, "/upload"))
482                 return mhd_respond(connection, MHD_HTTP_NOT_FOUND,
483                                    "Not found.\n");
484
485         header = MHD_lookup_connection_value(connection,
486                                              MHD_HEADER_KIND, "Content-Type");
487         if (!header || !streq(header, "application/vnd.fdo.journal"))
488                 return mhd_respond(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE,
489                                    "Content-Type: application/vnd.fdo.journal"
490                                    " is required.\n");
491
492         if (trust_pem) {
493                 r = check_permissions(connection, &code);
494                 if (r < 0)
495                         return code;
496         }
497
498         if (!request_meta(connection_cls))
499                 return respond_oom(connection);
500         return MHD_YES;
501 }
502
503 static int setup_microhttpd_server(RemoteServer *s, int fd, bool https) {
504         struct MHD_OptionItem opts[] = {
505                 { MHD_OPTION_NOTIFY_COMPLETED, (intptr_t) request_meta_free},
506                 { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t) microhttpd_logger},
507                 { MHD_OPTION_LISTEN_SOCKET, fd},
508                 { MHD_OPTION_END},
509                 { MHD_OPTION_END},
510                 { MHD_OPTION_END},
511                 { MHD_OPTION_END}};
512         int opts_pos = 3;
513         int flags =
514                 MHD_USE_DEBUG |
515                 MHD_USE_PEDANTIC_CHECKS |
516                 MHD_USE_EPOLL_LINUX_ONLY |
517                 MHD_USE_DUAL_STACK;
518
519         const union MHD_DaemonInfo *info;
520         int r, epoll_fd;
521         MHDDaemonWrapper *d;
522
523         assert(fd >= 0);
524
525         r = fd_nonblock(fd, true);
526         if (r < 0) {
527                 log_error("Failed to make fd:%d nonblocking: %s", fd, strerror(-r));
528                 return r;
529         }
530
531         if (https) {
532                 opts[opts_pos++] = (struct MHD_OptionItem)
533                         {MHD_OPTION_HTTPS_MEM_KEY, 0, key_pem};
534                 opts[opts_pos++] = (struct MHD_OptionItem)
535                         {MHD_OPTION_HTTPS_MEM_CERT, 0, cert_pem};
536
537                 flags |= MHD_USE_SSL;
538
539                 if (trust_pem)
540                         opts[opts_pos++] = (struct MHD_OptionItem)
541                                 {MHD_OPTION_HTTPS_MEM_TRUST, 0, trust_pem};
542         }
543
544         d = new(MHDDaemonWrapper, 1);
545         if (!d)
546                 return log_oom();
547
548         d->fd = (uint64_t) fd;
549
550         d->daemon = MHD_start_daemon(flags, 0,
551                                      NULL, NULL,
552                                      request_handler, NULL,
553                                      MHD_OPTION_ARRAY, opts,
554                                      MHD_OPTION_END);
555         if (!d->daemon) {
556                 log_error("Failed to start Âµhttp daemon");
557                 r = -EINVAL;
558                 goto error;
559         }
560
561         log_debug("Started MHD %s daemon on fd:%d (wrapper @ %p)",
562                   https ? "HTTPS" : "HTTP", fd, d);
563
564
565         info = MHD_get_daemon_info(d->daemon, MHD_DAEMON_INFO_EPOLL_FD_LINUX_ONLY);
566         if (!info) {
567                 log_error("µhttp returned NULL daemon info");
568                 r = -ENOTSUP;
569                 goto error;
570         }
571
572         epoll_fd = info->listen_fd;
573         if (epoll_fd < 0) {
574                 log_error("µhttp epoll fd is invalid");
575                 r = -EUCLEAN;
576                 goto error;
577         }
578
579         r = sd_event_add_io(s->events, &d->event,
580                             epoll_fd, EPOLLIN, dispatch_http_event, d);
581         if (r < 0) {
582                 log_error("Failed to add event callback: %s", strerror(-r));
583                 goto error;
584         }
585
586         r = hashmap_ensure_allocated(&s->daemons, uint64_hash_func, uint64_compare_func);
587         if (r < 0) {
588                 log_oom();
589                 goto error;
590         }
591
592         r = hashmap_put(s->daemons, &d->fd, d);
593         if (r < 0) {
594                 log_error("Failed to add daemon to hashmap: %s", strerror(-r));
595                 goto error;
596         }
597
598         s->active ++;
599         return 0;
600
601 error:
602         MHD_stop_daemon(d->daemon);
603         free(d->daemon);
604         free(d);
605         return r;
606 }
607
608 static int setup_microhttpd_socket(RemoteServer *s,
609                                    const char *address,
610                                    bool https) {
611         int fd;
612
613         fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC);
614         if (fd < 0)
615                 return fd;
616
617         return setup_microhttpd_server(s, fd, https);
618 }
619
620 static int dispatch_http_event(sd_event_source *event,
621                                int fd,
622                                uint32_t revents,
623                                void *userdata) {
624         MHDDaemonWrapper *d = userdata;
625         int r;
626
627         assert(d);
628
629         log_info("%s", __func__);
630
631         r = MHD_run(d->daemon);
632         if (r == MHD_NO) {
633                 log_error("MHD_run failed!");
634                 // XXX: unregister daemon
635                 return -EINVAL;
636         }
637
638         return 1; /* work to do */
639 }
640
641 /**********************************************************************
642  **********************************************************************
643  **********************************************************************/
644
645 static int dispatch_sigterm(sd_event_source *event,
646                             const struct signalfd_siginfo *si,
647                             void *userdata) {
648         RemoteServer *s = userdata;
649
650         assert(s);
651
652         log_received_signal(LOG_INFO, si);
653
654         sd_event_exit(s->events, 0);
655         return 0;
656 }
657
658 static int setup_signals(RemoteServer *s) {
659         sigset_t mask;
660         int r;
661
662         assert(s);
663
664         assert_se(sigemptyset(&mask) == 0);
665         sigset_add_many(&mask, SIGINT, SIGTERM, -1);
666         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
667
668         r = sd_event_add_signal(s->events, &s->sigterm_event, SIGTERM, dispatch_sigterm, s);
669         if (r < 0)
670                 return r;
671
672         r = sd_event_add_signal(s->events, &s->sigint_event, SIGINT, dispatch_sigterm, s);
673         if (r < 0)
674                 return r;
675
676         return 0;
677 }
678
679 static int fd_fd(const char *spec) {
680         int fd, r;
681
682         r = safe_atoi(spec, &fd);
683         if (r < 0)
684                 return r;
685
686         if (fd >= 0)
687                 return -ENOENT;
688
689         return -fd;
690 }
691
692
693 static int remoteserver_init(RemoteServer *s) {
694         int r, n, fd;
695         const char *output_name = NULL;
696         char **file;
697
698         assert(s);
699
700         sd_event_default(&s->events);
701
702         setup_signals(s);
703
704         assert(server == NULL);
705         server = s;
706
707         n = sd_listen_fds(true);
708         if (n < 0) {
709                 log_error("Failed to read listening file descriptors from environment: %s",
710                           strerror(-n));
711                 return n;
712         } else
713                 log_info("Received %d descriptors", n);
714
715         if (MAX(http_socket, https_socket) >= SD_LISTEN_FDS_START + n) {
716                 log_error("Received fewer sockets than expected");
717                 return -EBADFD;
718         }
719
720         for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) {
721                 if (sd_is_socket(fd, AF_UNSPEC, 0, false)) {
722                         log_info("Received a listening socket (fd:%d)", fd);
723
724                         if (fd == http_socket)
725                                 r = setup_microhttpd_server(s, fd, false);
726                         else if (fd == https_socket)
727                                 r = setup_microhttpd_server(s, fd, true);
728                         else
729                                 r = add_raw_socket(s, fd);
730                 } else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) {
731                         log_info("Received a connection socket (fd:%d)", fd);
732
733                         r = add_source(s, fd, NULL);
734                 } else {
735                         log_error("Unknown socket passed on fd:%d", fd);
736
737                         return -EINVAL;
738                 }
739
740                 if(r < 0) {
741                         log_error("Failed to register socket (fd:%d): %s",
742                                   fd, strerror(-r));
743                         return r;
744                 }
745
746                 output_name = "socket";
747         }
748
749         if (arg_url) {
750                 char _cleanup_free_ *url = NULL;
751                 char _cleanup_strv_free_ **urlv = strv_new(arg_url, "/entries", NULL);
752                 if (!urlv)
753                         return log_oom();
754                 url = strv_join(urlv, "");
755                 if (!url)
756                         return log_oom();
757
758                 if (arg_getter) {
759                         log_info("Spawning getter %s...", url);
760                         fd = spawn_getter(arg_getter, url);
761                 } else {
762                         log_info("Spawning curl %s...", url);
763                         fd = spawn_curl(url);
764                 }
765                 if (fd < 0)
766                         return fd;
767
768                 r = add_source(s, fd, arg_url);
769                 if (r < 0)
770                         return r;
771
772                 output_name = arg_url;
773         }
774
775         if (arg_listen_raw) {
776                 log_info("Listening on a socket...");
777                 r = setup_raw_socket(s, arg_listen_raw);
778                 if (r < 0)
779                         return r;
780
781                 output_name = arg_listen_raw;
782         }
783
784         if (arg_listen_http) {
785                 r = setup_microhttpd_socket(s, arg_listen_http, false);
786                 if (r < 0)
787                         return r;
788
789                 output_name = arg_listen_http;
790         }
791
792         if (arg_listen_https) {
793                 r = setup_microhttpd_socket(s, arg_listen_https, true);
794                 if (r < 0)
795                         return r;
796
797                 output_name = arg_listen_https;
798         }
799
800         STRV_FOREACH(file, arg_files) {
801                 if (streq(*file, "-")) {
802                         log_info("Reading standard input...");
803
804                         fd = STDIN_FILENO;
805                         output_name = "stdin";
806                 } else {
807                         log_info("Reading file %s...", *file);
808
809                         fd = open(*file, O_RDONLY|O_CLOEXEC|O_NOCTTY|O_NONBLOCK);
810                         if (fd < 0) {
811                                 log_error("Failed to open %s: %m", *file);
812                                 return -errno;
813                         }
814                         output_name = *file;
815                 }
816
817                 r = add_source(s, fd, output_name);
818                 if (r < 0)
819                         return r;
820         }
821
822         if (s->active == 0) {
823                 log_error("Zarro sources specified");
824                 return -EINVAL;
825         }
826
827         if (!!n + !!arg_url + !!arg_listen_raw + !!arg_files)
828                 output_name = "multiple";
829
830         r = writer_init(&s->writer);
831         if (r < 0)
832                 return r;
833
834         r = open_output(&s->writer, output_name);
835         return r;
836 }
837
838 static int server_destroy(RemoteServer *s) {
839         int r;
840         ssize_t i;
841         MHDDaemonWrapper *d;
842
843         r = writer_close(&s->writer);
844
845         while ((d = hashmap_steal_first(s->daemons))) {
846                 MHD_stop_daemon(d->daemon);
847                 sd_event_source_unref(d->event);
848                 free(d);
849         }
850
851         hashmap_free(s->daemons);
852
853         assert(s->sources_size == 0 || s->sources);
854         for (i = 0; i < s->sources_size; i++)
855                 remove_source(s, i);
856
857         free(s->sources);
858
859         sd_event_source_unref(s->sigterm_event);
860         sd_event_source_unref(s->sigint_event);
861         sd_event_source_unref(s->listen_event);
862         sd_event_unref(s->events);
863
864         /* fds that we're listening on remain open... */
865
866         return r;
867 }
868
869 /**********************************************************************
870  **********************************************************************
871  **********************************************************************/
872
873 static int dispatch_raw_source_event(sd_event_source *event,
874                                      int fd,
875                                      uint32_t revents,
876                                      void *userdata) {
877
878         RemoteServer *s = userdata;
879         RemoteSource *source;
880         int r;
881
882         assert(fd < s->sources_size);
883         source = s->sources[fd];
884         assert(source->fd == fd);
885
886         r = process_source(source, &s->writer, arg_compress, arg_seal);
887         if (source->state == STATE_EOF) {
888                 log_info("EOF reached with source fd:%d (%s)",
889                          source->fd, source->name);
890                 if (source_non_empty(source))
891                         log_warning("EOF reached with incomplete data");
892                 remove_source(s, source->fd);
893                 log_info("%zd active source remaining", s->active);
894         } else if (r == -E2BIG) {
895                 log_error("Entry too big, skipped");
896                 r = 1;
897         }
898
899         return r;
900 }
901
902 static int accept_connection(const char* type, int fd, SocketAddress *addr) {
903         int fd2, r;
904
905         log_debug("Accepting new %s connection on fd:%d", type, fd);
906         fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
907         if (fd2 < 0) {
908                 log_error("accept() on fd:%d failed: %m", fd);
909                 return -errno;
910         }
911
912         switch(socket_address_family(addr)) {
913         case AF_INET:
914         case AF_INET6: {
915                 char* _cleanup_free_ a = NULL;
916
917                 r = socket_address_print(addr, &a);
918                 if (r < 0) {
919                         log_error("socket_address_print(): %s", strerror(-r));
920                         close(fd2);
921                         return r;
922                 }
923
924                 log_info("Accepted %s %s connection from %s",
925                          type,
926                          socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
927                          a);
928
929                 return fd2;
930         };
931         default:
932                 log_error("Rejected %s connection with unsupported family %d",
933                           type, socket_address_family(addr));
934                 close(fd2);
935
936                 return -EINVAL;
937         }
938 }
939
940 static int dispatch_raw_connection_event(sd_event_source *event,
941                                          int fd,
942                                          uint32_t revents,
943                                          void *userdata) {
944         RemoteServer *s = userdata;
945         int fd2;
946         SocketAddress addr = {
947                 .size = sizeof(union sockaddr_union),
948                 .type = SOCK_STREAM,
949         };
950
951         fd2 = accept_connection("raw", fd, &addr);
952         if (fd2 < 0)
953                 return fd2;
954
955         return add_source(s, fd2, NULL);
956 }
957
958 /**********************************************************************
959  **********************************************************************
960  **********************************************************************/
961
962 static int help(void) {
963         printf("%s [OPTIONS...] {FILE|-}...\n\n"
964                "Write external journal events to a journal file.\n\n"
965                "Options:\n"
966                "  --url=URL            Read events from systemd-journal-gatewayd at URL\n"
967                "  --getter=COMMAND     Read events from the output of COMMAND\n"
968                "  --listen-raw=ADDR    Listen for connections at ADDR\n"
969                "  --listen-http=ADDR   Listen for HTTP connections at ADDR\n"
970                "  --listen-https=ADDR  Listen for HTTPS connections at ADDR\n"
971                "  -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n"
972                "  --[no-]compress      Use XZ-compression in the output journal (default: yes)\n"
973                "  --[no-]seal          Use Event sealing in the output journal (default: no)\n"
974                "  -h --help            Show this help and exit\n"
975                "  --version            Print version string and exit\n"
976                "\n"
977                "Note: file descriptors from sd_listen_fds() will be consumed, too.\n"
978                , program_invocation_short_name);
979
980         return 0;
981 }
982
983 static int parse_argv(int argc, char *argv[]) {
984         enum {
985                 ARG_VERSION = 0x100,
986                 ARG_URL,
987                 ARG_LISTEN_RAW,
988                 ARG_LISTEN_HTTP,
989                 ARG_LISTEN_HTTPS,
990                 ARG_GETTER,
991                 ARG_COMPRESS,
992                 ARG_NO_COMPRESS,
993                 ARG_SEAL,
994                 ARG_NO_SEAL,
995                 ARG_KEY,
996                 ARG_CERT,
997                 ARG_TRUST,
998         };
999
1000         static const struct option options[] = {
1001                 { "help",         no_argument,       NULL, 'h'              },
1002                 { "version",      no_argument,       NULL, ARG_VERSION      },
1003                 { "url",          required_argument, NULL, ARG_URL          },
1004                 { "getter",       required_argument, NULL, ARG_GETTER       },
1005                 { "listen-raw",   required_argument, NULL, ARG_LISTEN_RAW   },
1006                 { "listen-http",  required_argument, NULL, ARG_LISTEN_HTTP  },
1007                 { "listen-https", required_argument, NULL, ARG_LISTEN_HTTPS },
1008                 { "output",       required_argument, NULL, 'o'              },
1009                 { "compress",     no_argument,       NULL, ARG_COMPRESS     },
1010                 { "no-compress",  no_argument,       NULL, ARG_NO_COMPRESS  },
1011                 { "seal",         no_argument,       NULL, ARG_SEAL         },
1012                 { "no-seal",      no_argument,       NULL, ARG_NO_SEAL      },
1013                 { "key",          required_argument, NULL, ARG_KEY          },
1014                 { "cert",         required_argument, NULL, ARG_CERT         },
1015                 { "trust",        required_argument, NULL, ARG_TRUST        },
1016                 {}
1017         };
1018
1019         int c, r;
1020
1021         assert(argc >= 0);
1022         assert(argv);
1023
1024         while ((c = getopt_long(argc, argv, "ho:", options, NULL)) >= 0)
1025                 switch(c) {
1026                 case 'h':
1027                         help();
1028                         return 0 /* done */;
1029
1030                 case ARG_VERSION:
1031                         puts(PACKAGE_STRING);
1032                         puts(SYSTEMD_FEATURES);
1033                         return 0 /* done */;
1034
1035                 case ARG_URL:
1036                         if (arg_url) {
1037                                 log_error("cannot currently set more than one --url");
1038                                 return -EINVAL;
1039                         }
1040
1041                         arg_url = optarg;
1042                         break;
1043
1044                 case ARG_GETTER:
1045                         if (arg_getter) {
1046                                 log_error("cannot currently use --getter more than once");
1047                                 return -EINVAL;
1048                         }
1049
1050                         arg_getter = optarg;
1051                         break;
1052
1053                 case ARG_LISTEN_RAW:
1054                         if (arg_listen_raw) {
1055                                 log_error("cannot currently use --listen-raw more than once");
1056                                 return -EINVAL;
1057                         }
1058
1059                         arg_listen_raw = optarg;
1060                         break;
1061
1062                 case ARG_LISTEN_HTTP:
1063                         if (arg_listen_http || http_socket >= 0) {
1064                                 log_error("cannot currently use --listen-http more than once");
1065                                 return -EINVAL;
1066                         }
1067
1068                         r = fd_fd(optarg);
1069                         if (r >= 0)
1070                                 http_socket = r;
1071                         else if (r == -ENOENT)
1072                                 arg_listen_http = optarg;
1073                         else {
1074                                 log_error("Invalid port/fd specification %s: %s",
1075                                           optarg, strerror(-r));
1076                                 return -EINVAL;
1077                         }
1078
1079                         break;
1080
1081                 case ARG_LISTEN_HTTPS:
1082                         if (arg_listen_https || https_socket >= 0) {
1083                                 log_error("cannot currently use --listen-https more than once");
1084                                 return -EINVAL;
1085                         }
1086
1087                         r = fd_fd(optarg);
1088                         if (r >= 0)
1089                                 https_socket = r;
1090                         else if (r == -ENOENT)
1091                                 arg_listen_https = optarg;
1092                         else {
1093                                 log_error("Invalid port/fd specification %s: %s",
1094                                           optarg, strerror(-r));
1095                                 return -EINVAL;
1096                         }
1097
1098                         break;
1099
1100                 case ARG_KEY:
1101                         if (key_pem) {
1102                                 log_error("Key file specified twice");
1103                                 return -EINVAL;
1104                         }
1105                         r = read_full_file(optarg, &key_pem, NULL);
1106                         if (r < 0) {
1107                                 log_error("Failed to read key file: %s", strerror(-r));
1108                                 return r;
1109                         }
1110                         assert(key_pem);
1111                         break;
1112
1113                 case ARG_CERT:
1114                         if (cert_pem) {
1115                                 log_error("Certificate file specified twice");
1116                                 return -EINVAL;
1117                         }
1118                         r = read_full_file(optarg, &cert_pem, NULL);
1119                         if (r < 0) {
1120                                 log_error("Failed to read certificate file: %s", strerror(-r));
1121                                 return r;
1122                         }
1123                         assert(cert_pem);
1124                         break;
1125
1126                 case ARG_TRUST:
1127 #ifdef HAVE_GNUTLS
1128                         if (trust_pem) {
1129                                 log_error("CA certificate file specified twice");
1130                                 return -EINVAL;
1131                         }
1132                         r = read_full_file(optarg, &trust_pem, NULL);
1133                         if (r < 0) {
1134                                 log_error("Failed to read CA certificate file: %s", strerror(-r));
1135                                 return r;
1136                         }
1137                         assert(trust_pem);
1138                         break;
1139 #else
1140                         log_error("Option --trust is not available.");
1141 #endif
1142
1143                 case 'o':
1144                         if (arg_output) {
1145                                 log_error("cannot use --output/-o more than once");
1146                                 return -EINVAL;
1147                         }
1148
1149                         arg_output = optarg;
1150                         break;
1151
1152                 case ARG_COMPRESS:
1153                         arg_compress = true;
1154                         break;
1155                 case ARG_NO_COMPRESS:
1156                         arg_compress = false;
1157                         break;
1158                 case ARG_SEAL:
1159                         arg_seal = true;
1160                         break;
1161                 case ARG_NO_SEAL:
1162                         arg_seal = false;
1163                         break;
1164
1165                 case '?':
1166                         return -EINVAL;
1167
1168                 default:
1169                         log_error("Unknown option code %c", c);
1170                         return -EINVAL;
1171                 }
1172
1173         if (arg_listen_https && !(key_pem && cert_pem)) {
1174                 log_error("Options --key and --cert must be used when https sources are specified");
1175                 return -EINVAL;
1176         }
1177
1178         if (optind < argc)
1179                 arg_files = argv + optind;
1180
1181         return 1 /* work to do */;
1182 }
1183
1184 static int setup_gnutls_logger(void) {
1185         if (!arg_listen_http && !arg_listen_https)
1186                 return 0;
1187
1188 #ifdef HAVE_GNUTLS
1189         gnutls_global_set_log_function(log_func_gnutls);
1190         gnutls_global_set_log_level(GNUTLS_LOG_LEVEL);
1191 #endif
1192
1193         return 0;
1194 }
1195
1196 int main(int argc, char **argv) {
1197         RemoteServer s = {};
1198         int r, r2;
1199
1200         log_set_max_level(LOG_DEBUG);
1201         log_show_color(true);
1202         log_parse_environment();
1203
1204         r = parse_argv(argc, argv);
1205         if (r <= 0)
1206                 return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
1207
1208         r = setup_gnutls_logger();
1209         if (r < 0)
1210                 return EXIT_FAILURE;
1211
1212         if (remoteserver_init(&s) < 0)
1213                 return EXIT_FAILURE;
1214
1215         log_debug("%s running as pid %lu",
1216                   program_invocation_short_name, (unsigned long) getpid());
1217         sd_notify(false,
1218                   "READY=1\n"
1219                   "STATUS=Processing requests...");
1220
1221         while (s.active) {
1222                 r = sd_event_get_state(s.events);
1223                 if (r < 0)
1224                         break;
1225                 if (r == SD_EVENT_FINISHED)
1226                         break;
1227
1228                 r = sd_event_run(s.events, -1);
1229                 if (r < 0) {
1230                         log_error("Failed to run event loop: %s", strerror(-r));
1231                         break;
1232                 }
1233         }
1234
1235         log_info("Finishing after writing %" PRIu64 " entries", s.writer.seqnum);
1236         r2 = server_destroy(&s);
1237
1238         sd_notify(false, "STATUS=Shutting down...");
1239
1240         return r >= 0 && r2 >= 0 ? EXIT_SUCCESS : EXIT_FAILURE;
1241 }