chiark / gitweb /
journal-upload: do not require port to be set
[elogind.git] / src / journal-remote / journal-upload.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4   This file is part of systemd.
5
6   Copyright 2014 Zbigniew JÄ™drzejewski-Szmek
7
8   systemd is free software; you can redistribute it and/or modify it
9   under the terms of the GNU Lesser General Public License as published by
10   the Free Software Foundation; either version 2.1 of the License, or
11   (at your option) any later version.
12
13   systemd is distributed in the hope that it will be useful, but
14   WITHOUT ANY WARRANTY; without even the implied warranty of
15   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16   Lesser General Public License for more details.
17
18   You should have received a copy of the GNU Lesser General Public License
19   along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <stdio.h>
23 #include <curl/curl.h>
24 #include <sys/stat.h>
25 #include <fcntl.h>
26 #include <getopt.h>
27
28 #include "sd-daemon.h"
29
30 #include "log.h"
31 #include "util.h"
32 #include "build.h"
33 #include "fileio.h"
34 #include "conf-parser.h"
35 #include "journal-upload.h"
36
37 #define PRIV_KEY_FILE CERTIFICATE_ROOT "/private/journal-upload.pem"
38 #define CERT_FILE     CERTIFICATE_ROOT "/certs/journal-upload.pem"
39 #define TRUST_FILE    CERTIFICATE_ROOT "/ca/trusted.pem"
40 #define DEFAULT_PORT  19532
41
42 static const char* arg_url;
43
44 static void close_fd_input(Uploader *u);
45
46 static const char *arg_key = NULL;
47 static const char *arg_cert = NULL;
48 static const char *arg_trust = NULL;
49
50 static const char *arg_directory = NULL;
51 static char **arg_file = NULL;
52 static const char *arg_cursor = NULL;
53 static bool arg_after_cursor = false;
54 static int arg_journal_type = 0;
55 static const char *arg_machine = NULL;
56 static bool arg_merge = false;
57 static int arg_follow = -1;
58 static const char *arg_save_state = NULL;
59
60 #define SERVER_ANSWER_KEEP 2048
61
62 #define STATE_FILE "/var/lib/systemd/journal-upload/state"
63
64 #define easy_setopt(curl, opt, value, level, cmd)                       \
65         {                                                               \
66                 code = curl_easy_setopt(curl, opt, value);              \
67                 if (code) {                                             \
68                         log_full(level,                                 \
69                                  "curl_easy_setopt " #opt " failed: %s", \
70                                   curl_easy_strerror(code));            \
71                         cmd;                                            \
72                 }                                                       \
73         }
74
75 static size_t output_callback(char *buf,
76                               size_t size,
77                               size_t nmemb,
78                               void *userp) {
79         Uploader *u = userp;
80
81         assert(u);
82
83         log_debug("The server answers (%zu bytes): %.*s",
84                   size*nmemb, (int)(size*nmemb), buf);
85
86         if (nmemb && !u->answer) {
87                 u->answer = strndup(buf, size*nmemb);
88                 if (!u->answer)
89                         log_warning("Failed to store server answer (%zu bytes): %s",
90                                     size*nmemb, strerror(ENOMEM));
91         }
92
93         return size * nmemb;
94 }
95
96 static int update_cursor_state(Uploader *u) {
97         _cleanup_free_ char *temp_path = NULL;
98         _cleanup_fclose_ FILE *f = NULL;
99         int r;
100
101         if (!u->state_file || !u->last_cursor)
102                 return 0;
103
104         r = fopen_temporary(u->state_file, &f, &temp_path);
105         if (r < 0)
106                 goto finish;
107
108         fprintf(f,
109                 "# This is private data. Do not parse.\n"
110                 "LAST_CURSOR=%s\n",
111                 u->last_cursor);
112
113         fflush(f);
114
115         if (ferror(f) || rename(temp_path, u->state_file) < 0) {
116                 r = -errno;
117                 unlink(u->state_file);
118                 unlink(temp_path);
119         }
120
121 finish:
122         if (r < 0)
123                 log_error("Failed to save state %s: %s", u->state_file, strerror(-r));
124
125         return r;
126 }
127
128 static int load_cursor_state(Uploader *u) {
129         int r;
130
131         if (!u->state_file)
132                 return 0;
133
134         r = parse_env_file(u->state_file, NEWLINE,
135                            "LAST_CURSOR",  &u->last_cursor,
136                            NULL);
137
138         if (r < 0 && r != -ENOENT) {
139                 log_error("Failed to read state file %s: %s",
140                           u->state_file, strerror(-r));
141                 return r;
142         }
143
144         return 0;
145 }
146
147
148
149 int start_upload(Uploader *u,
150                  size_t (*input_callback)(void *ptr,
151                                           size_t size,
152                                           size_t nmemb,
153                                           void *userdata),
154                  void *data) {
155         CURLcode code;
156
157         assert(u);
158         assert(input_callback);
159
160         if (!u->header) {
161                 struct curl_slist *h;
162
163                 h = curl_slist_append(NULL, "Content-Type: application/vnd.fdo.journal");
164                 if (!h)
165                         return log_oom();
166
167                 h = curl_slist_append(h, "Transfer-Encoding: chunked");
168                 if (!h) {
169                         curl_slist_free_all(h);
170                         return log_oom();
171                 }
172
173                 h = curl_slist_append(h, "Accept: text/plain");
174                 if (!h) {
175                         curl_slist_free_all(h);
176                         return log_oom();
177                 }
178
179                 u->header = h;
180         }
181
182         if (!u->easy) {
183                 CURL *curl;
184
185                 curl = curl_easy_init();
186                 if (!curl) {
187                         log_error("Call to curl_easy_init failed.");
188                         return -ENOSR;
189                 }
190
191                 /* tell it to POST to the URL */
192                 easy_setopt(curl, CURLOPT_POST, 1L,
193                             LOG_ERR, return -EXFULL);
194
195                 easy_setopt(curl, CURLOPT_ERRORBUFFER, u->error,
196                             LOG_ERR, return -EXFULL);
197
198                 /* set where to write to */
199                 easy_setopt(curl, CURLOPT_WRITEFUNCTION, output_callback,
200                             LOG_ERR, return -EXFULL);
201
202                 easy_setopt(curl, CURLOPT_WRITEDATA, data,
203                             LOG_ERR, return -EXFULL);
204
205                 /* set where to read from */
206                 easy_setopt(curl, CURLOPT_READFUNCTION, input_callback,
207                             LOG_ERR, return -EXFULL);
208
209                 easy_setopt(curl, CURLOPT_READDATA, data,
210                             LOG_ERR, return -EXFULL);
211
212                 /* use our special own mime type and chunked transfer */
213                 easy_setopt(curl, CURLOPT_HTTPHEADER, u->header,
214                             LOG_ERR, return -EXFULL);
215
216                 /* enable verbose for easier tracing */
217                 easy_setopt(curl, CURLOPT_VERBOSE, 1L, LOG_WARNING, );
218
219                 easy_setopt(curl, CURLOPT_USERAGENT,
220                             "systemd-journal-upload " PACKAGE_STRING,
221                             LOG_WARNING, );
222
223                 if (arg_key || startswith(u->url, "https://")) {
224                         easy_setopt(curl, CURLOPT_SSLKEY, arg_key ?: PRIV_KEY_FILE,
225                                     LOG_ERR, return -EXFULL);
226                         easy_setopt(curl, CURLOPT_SSLCERT, arg_cert ?: CERT_FILE,
227                                     LOG_ERR, return -EXFULL);
228                 }
229
230                 if (arg_trust || startswith(u->url, "https://"))
231                         easy_setopt(curl, CURLOPT_CAINFO, arg_trust ?: TRUST_FILE,
232                                     LOG_ERR, return -EXFULL);
233
234                 if (arg_key || arg_trust)
235                         easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1,
236                                     LOG_WARNING, );
237
238                 u->easy = curl;
239         } else {
240                 /* truncate the potential old error message */
241                 u->error[0] = '\0';
242
243                 free(u->answer);
244                 u->answer = 0;
245         }
246
247         /* upload to this place */
248         code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url);
249         if (code) {
250                 log_error("curl_easy_setopt CURLOPT_URL failed: %s",
251                           curl_easy_strerror(code));
252                 return -EXFULL;
253         }
254
255         u->uploading = true;
256
257         return 0;
258 }
259
260 static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
261         Uploader *u = userp;
262
263         ssize_t r;
264
265         assert(u);
266         assert(nmemb <= SSIZE_MAX / size);
267
268         if (u->input < 0)
269                 return 0;
270
271         r = read(u->input, buf, size * nmemb);
272         log_debug("%s: allowed %zu, read %zu", __func__, size*nmemb, r);
273
274         if (r > 0)
275                 return r;
276
277         u->uploading = false;
278         if (r == 0) {
279                 log_debug("Reached EOF");
280                 close_fd_input(u);
281                 return 0;
282         } else {
283                 log_error("Aborting transfer after read error on input: %m.");
284                 return CURL_READFUNC_ABORT;
285         }
286 }
287
288 static void close_fd_input(Uploader *u) {
289         assert(u);
290
291         if (u->input >= 0)
292                 close_nointr(u->input);
293         u->input = -1;
294         u->timeout = 0;
295 }
296
297 static int dispatch_fd_input(sd_event_source *event,
298                              int fd,
299                              uint32_t revents,
300                              void *userp) {
301         Uploader *u = userp;
302
303         assert(u);
304         assert(fd >= 0);
305
306         if (revents & EPOLLHUP) {
307                 log_debug("Received HUP");
308                 close_fd_input(u);
309                 return 0;
310         }
311
312         if (!(revents & EPOLLIN)) {
313                 log_warning("Unexpected poll event %"PRIu32".", revents);
314                 return -EINVAL;
315         }
316
317         if (u->uploading) {
318                 log_warning("dispatch_fd_input called when uploading, ignoring.");
319                 return 0;
320         }
321
322         return start_upload(u, fd_input_callback, u);
323 }
324
325 static int open_file_for_upload(Uploader *u, const char *filename) {
326         int fd, r = 0;
327
328         if (streq(filename, "-"))
329                 fd = STDIN_FILENO;
330         else {
331                 fd = open(filename, O_RDONLY|O_CLOEXEC|O_NOCTTY);
332                 if (fd < 0) {
333                         log_error("Failed to open %s: %m", filename);
334                         return -errno;
335                 }
336         }
337
338         u->input = fd;
339
340         if (arg_follow) {
341                 r = sd_event_add_io(u->events, &u->input_event,
342                                     fd, EPOLLIN, dispatch_fd_input, u);
343                 if (r < 0) {
344                         if (r != -EPERM || arg_follow > 0) {
345                                 log_error("Failed to register input event: %s", strerror(-r));
346                                 return r;
347                         }
348
349                         /* Normal files should just be consumed without polling. */
350                         r = start_upload(u, fd_input_callback, u);
351                 }
352         }
353
354         return r;
355 }
356
357 static int dispatch_sigterm(sd_event_source *event,
358                             const struct signalfd_siginfo *si,
359                             void *userdata) {
360         Uploader *u = userdata;
361
362         assert(u);
363
364         log_received_signal(LOG_INFO, si);
365
366         close_fd_input(u);
367         close_journal_input(u);
368
369         sd_event_exit(u->events, 0);
370         return 0;
371 }
372
373 static int setup_signals(Uploader *u) {
374         sigset_t mask;
375         int r;
376
377         assert(u);
378
379         assert_se(sigemptyset(&mask) == 0);
380         sigset_add_many(&mask, SIGINT, SIGTERM, -1);
381         assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
382
383         r = sd_event_add_signal(u->events, &u->sigterm_event, SIGTERM, dispatch_sigterm, u);
384         if (r < 0)
385                 return r;
386
387         r = sd_event_add_signal(u->events, &u->sigint_event, SIGINT, dispatch_sigterm, u);
388         if (r < 0)
389                 return r;
390
391         return 0;
392 }
393
394 static int setup_uploader(Uploader *u, const char *url, const char *state_file) {
395         int r;
396         const char *host, *proto = "";
397
398         assert(u);
399         assert(url);
400
401         memzero(u, sizeof(Uploader));
402         u->input = -1;
403
404         if (!(host = startswith(url, "http://")) && !(host = startswith(url, "https://"))) {
405                 host = url;
406                 proto = "https://";
407         }
408
409         if (strchr(host, ':'))
410                 u->url = strjoin(proto, url, "/upload", NULL);
411         else {
412                 char *t;
413                 size_t x;
414
415                 t = strdupa(url);
416                 x = strlen(t);
417                 while (x > 0 && t[x - 1] == '/')
418                         t[x - 1] = '\0';
419
420                 u->url = strjoin(proto, t, ":" STRINGIFY(DEFAULT_PORT), "/upload", NULL);
421         }
422         if (!u->url)
423                 return log_oom();
424
425         u->state_file = state_file;
426
427         r = sd_event_default(&u->events);
428         if (r < 0) {
429                 log_error("sd_event_default failed: %s", strerror(-r));
430                 return r;
431         }
432
433         r = setup_signals(u);
434         if (r < 0) {
435                 log_error("Failed to set up signals: %s", strerror(-r));
436                 return r;
437         }
438
439         return load_cursor_state(u);
440 }
441
442 static void destroy_uploader(Uploader *u) {
443         assert(u);
444
445         curl_easy_cleanup(u->easy);
446         curl_slist_free_all(u->header);
447         free(u->answer);
448
449         free(u->last_cursor);
450         free(u->current_cursor);
451
452         free(u->url);
453
454         u->input_event = sd_event_source_unref(u->input_event);
455
456         close_fd_input(u);
457         close_journal_input(u);
458
459         sd_event_source_unref(u->sigterm_event);
460         sd_event_source_unref(u->sigint_event);
461         sd_event_unref(u->events);
462 }
463
464 static int perform_upload(Uploader *u) {
465         CURLcode code;
466         long status;
467
468         assert(u);
469
470         code = curl_easy_perform(u->easy);
471         if (code) {
472                 log_error("Upload to %s failed: %.*s",
473                           u->url,
474                           u->error[0] ? (int) sizeof(u->error) : INT_MAX,
475                           u->error[0] ? u->error : curl_easy_strerror(code));
476                 return -EIO;
477         }
478
479         code = curl_easy_getinfo(u->easy, CURLINFO_RESPONSE_CODE, &status);
480         if (code) {
481                 log_error("Failed to retrieve response code: %s",
482                           curl_easy_strerror(code));
483                 return -EUCLEAN;
484         }
485
486         if (status >= 300) {
487                 log_error("Upload to %s failed with code %lu: %s",
488                           u->url, status, strna(u->answer));
489                 return -EIO;
490         } else if (status < 200) {
491                 log_error("Upload to %s finished with unexpected code %lu: %s",
492                           u->url, status, strna(u->answer));
493                 return -EIO;
494         } else
495                 log_debug("Upload finished successfully with code %lu: %s",
496                           status, strna(u->answer));
497
498         free(u->last_cursor);
499         u->last_cursor = u->current_cursor;
500         u->current_cursor = NULL;
501
502         return update_cursor_state(u);
503 }
504
505 static int parse_config(void) {
506         const ConfigTableItem items[] = {
507                 { "Upload",  "URL",                    config_parse_string, 0, &arg_url    },
508                 { "Upload",  "ServerKeyFile",          config_parse_path,   0, &arg_key    },
509                 { "Upload",  "ServerCertificateFile",  config_parse_path,   0, &arg_cert   },
510                 { "Upload",  "TrustedCertificateFile", config_parse_path,   0, &arg_trust  },
511                 {}};
512
513         return config_parse(NULL, PKGSYSCONFDIR "/journal-upload.conf", NULL,
514                             "Upload\0",
515                             config_item_table_lookup, items,
516                             false, false, true, NULL);
517 }
518
519 static void help(void) {
520         printf("%s -u URL {FILE|-}...\n\n"
521                "Upload journal events to a remote server.\n\n"
522                "  -h --help                 Show this help\n"
523                "     --version              Show package version\n"
524                "  -u --url=URL              Upload to this address (default port "
525                                             STRINGIFY(DEFAULT_PORT) ")\n"
526                "     --key=FILENAME         Specify key in PEM format (default:\n"
527                "                            \"" PRIV_KEY_FILE "\")\n"
528                "     --cert=FILENAME        Specify certificate in PEM format (default:\n"
529                "                            \"" CERT_FILE "\")\n"
530                "     --trust=FILENAME|all   Specify CA certificate or disable checking (default:\n"
531                "                            \"" TRUST_FILE "\")\n"
532                "     --system               Use the system journal\n"
533                "     --user                 Use the user journal for the current user\n"
534                "  -m --merge                Use  all available journals\n"
535                "  -M --machine=CONTAINER    Operate on local container\n"
536                "  -D --directory=PATH       Use journal files from directory\n"
537                "     --file=PATH            Use this journal file\n"
538                "     --cursor=CURSOR        Start at the specified cursor\n"
539                "     --after-cursor=CURSOR  Start after the specified cursor\n"
540                "     --follow[=BOOL]        Do [not] wait for input\n"
541                "     --save-state[=FILE]    Save uploaded cursors (default \n"
542                "                            " STATE_FILE ")\n"
543                "  -h --help                 Show this help and exit\n"
544                "     --version              Print version string and exit\n"
545                , program_invocation_short_name);
546 }
547
548 static int parse_argv(int argc, char *argv[]) {
549         enum {
550                 ARG_VERSION = 0x100,
551                 ARG_KEY,
552                 ARG_CERT,
553                 ARG_TRUST,
554                 ARG_USER,
555                 ARG_SYSTEM,
556                 ARG_FILE,
557                 ARG_CURSOR,
558                 ARG_AFTER_CURSOR,
559                 ARG_FOLLOW,
560                 ARG_SAVE_STATE,
561         };
562
563         static const struct option options[] = {
564                 { "help",         no_argument,       NULL, 'h'                },
565                 { "version",      no_argument,       NULL, ARG_VERSION        },
566                 { "url",          required_argument, NULL, 'u'                },
567                 { "key",          required_argument, NULL, ARG_KEY            },
568                 { "cert",         required_argument, NULL, ARG_CERT           },
569                 { "trust",        required_argument, NULL, ARG_TRUST          },
570                 { "system",       no_argument,       NULL, ARG_SYSTEM         },
571                 { "user",         no_argument,       NULL, ARG_USER           },
572                 { "merge",        no_argument,       NULL, 'm'                },
573                 { "machine",      required_argument, NULL, 'M'                },
574                 { "directory",    required_argument, NULL, 'D'                },
575                 { "file",         required_argument, NULL, ARG_FILE           },
576                 { "cursor",       required_argument, NULL, ARG_CURSOR         },
577                 { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR   },
578                 { "follow",       optional_argument, NULL, ARG_FOLLOW         },
579                 { "save-state",   optional_argument, NULL, ARG_SAVE_STATE     },
580                 {}
581         };
582
583         int c, r;
584
585         assert(argc >= 0);
586         assert(argv);
587
588         opterr = 0;
589
590         while ((c = getopt_long(argc, argv, "hu:mM:D:", options, NULL)) >= 0)
591                 switch(c) {
592                 case 'h':
593                         help();
594                         return 0 /* done */;
595
596                 case ARG_VERSION:
597                         puts(PACKAGE_STRING);
598                         puts(SYSTEMD_FEATURES);
599                         return 0 /* done */;
600
601                 case 'u':
602                         if (arg_url) {
603                                 log_error("cannot use more than one --url");
604                                 return -EINVAL;
605                         }
606
607                         arg_url = optarg;
608                         break;
609
610                 case ARG_KEY:
611                         if (arg_key) {
612                                 log_error("cannot use more than one --key");
613                                 return -EINVAL;
614                         }
615
616                         arg_key = optarg;
617                         break;
618
619                 case ARG_CERT:
620                         if (arg_cert) {
621                                 log_error("cannot use more than one --cert");
622                                 return -EINVAL;
623                         }
624
625                         arg_cert = optarg;
626                         break;
627
628                 case ARG_TRUST:
629                         if (arg_trust) {
630                                 log_error("cannot use more than one --trust");
631                                 return -EINVAL;
632                         }
633
634                         arg_trust = optarg;
635                         break;
636
637                 case ARG_SYSTEM:
638                         arg_journal_type |= SD_JOURNAL_SYSTEM;
639                         break;
640
641                 case ARG_USER:
642                         arg_journal_type |= SD_JOURNAL_CURRENT_USER;
643                         break;
644
645                 case 'm':
646                         arg_merge = true;
647                         break;
648
649                 case 'M':
650                         if (arg_machine) {
651                                 log_error("cannot use more than one --machine/-M");
652                                 return -EINVAL;
653                         }
654
655                         arg_machine = optarg;
656                         break;
657
658                 case 'D':
659                         if (arg_directory) {
660                                 log_error("cannot use more than one --directory/-D");
661                                 return -EINVAL;
662                         }
663
664                         arg_directory = optarg;
665                         break;
666
667                 case ARG_FILE:
668                         r = glob_extend(&arg_file, optarg);
669                         if (r < 0) {
670                                 log_error("Failed to add paths: %s", strerror(-r));
671                                 return r;
672                         };
673                         break;
674
675                 case ARG_CURSOR:
676                         if (arg_cursor) {
677                                 log_error("cannot use more than one --cursor/--after-cursor");
678                                 return -EINVAL;
679                         }
680
681                         arg_cursor = optarg;
682                         break;
683
684                 case ARG_AFTER_CURSOR:
685                         if (arg_cursor) {
686                                 log_error("cannot use more than one --cursor/--after-cursor");
687                                 return -EINVAL;
688                         }
689
690                         arg_cursor = optarg;
691                         arg_after_cursor = true;
692                         break;
693
694                 case ARG_FOLLOW:
695                         if (optarg) {
696                                 r = parse_boolean(optarg);
697                                 if (r < 0) {
698                                         log_error("Failed to parse --follow= parameter.");
699                                         return -EINVAL;
700                                 }
701
702                                 arg_follow = !!r;
703                         } else
704                                 arg_follow = true;
705
706                         break;
707
708                 case ARG_SAVE_STATE:
709                         arg_save_state = optarg ?: STATE_FILE;
710                         break;
711
712                 case '?':
713                         log_error("Unknown option %s.", argv[optind-1]);
714                         return -EINVAL;
715
716                 case ':':
717                         log_error("Missing argument to %s.", argv[optind-1]);
718                         return -EINVAL;
719
720                 default:
721                         assert_not_reached("Unhandled option code.");
722                 }
723
724         if (!arg_url) {
725                 log_error("Required --url/-u option missing.");
726                 return -EINVAL;
727         }
728
729         if (!!arg_key != !!arg_cert) {
730                 log_error("Options --key and --cert must be used together.");
731                 return -EINVAL;
732         }
733
734         if (optind < argc && (arg_directory || arg_file || arg_machine || arg_journal_type)) {
735                 log_error("Input arguments make no sense with journal input.");
736                 return -EINVAL;
737         }
738
739         return 1;
740 }
741
742 static int open_journal(sd_journal **j) {
743         int r;
744
745         if (arg_directory)
746                 r = sd_journal_open_directory(j, arg_directory, arg_journal_type);
747         else if (arg_file)
748                 r = sd_journal_open_files(j, (const char**) arg_file, 0);
749         else if (arg_machine)
750                 r = sd_journal_open_container(j, arg_machine, 0);
751         else
752                 r = sd_journal_open(j, !arg_merge*SD_JOURNAL_LOCAL_ONLY + arg_journal_type);
753         if (r < 0)
754                 log_error("Failed to open %s: %s",
755                           arg_directory ? arg_directory : arg_file ? "files" : "journal",
756                           strerror(-r));
757         return r;
758 }
759
760 int main(int argc, char **argv) {
761         Uploader u;
762         int r;
763         bool use_journal;
764
765         log_show_color(true);
766         log_parse_environment();
767
768         r = parse_config();
769         if (r < 0)
770                 goto finish;
771
772         r = parse_argv(argc, argv);
773         if (r <= 0)
774                 goto finish;
775
776         r = setup_uploader(&u, arg_url, arg_save_state);
777         if (r < 0)
778                 goto cleanup;
779
780         sd_event_set_watchdog(u.events, true);
781
782         log_debug("%s running as pid "PID_FMT,
783                   program_invocation_short_name, getpid());
784
785         use_journal = optind >= argc;
786         if (use_journal) {
787                 sd_journal *j;
788                 r = open_journal(&j);
789                 if (r < 0)
790                         goto finish;
791                 r = open_journal_for_upload(&u, j,
792                                             arg_cursor ?: u.last_cursor,
793                                             arg_cursor ? arg_after_cursor : true,
794                                             !!arg_follow);
795                 if (r < 0)
796                         goto finish;
797         }
798
799         sd_notify(false,
800                   "READY=1\n"
801                   "STATUS=Processing input...");
802
803         while (true) {
804                 if (use_journal) {
805                         if (!u.journal)
806                                 break;
807
808                         r = check_journal_input(&u);
809                 } else if (u.input < 0 && !use_journal) {
810                         if (optind >= argc)
811                                 break;
812
813                         log_debug("Using %s as input.", argv[optind]);
814                         r = open_file_for_upload(&u, argv[optind++]);
815                 }
816                 if (r < 0)
817                         goto cleanup;
818
819                 r = sd_event_get_state(u.events);
820                 if (r < 0)
821                         break;
822                 if (r == SD_EVENT_FINISHED)
823                         break;
824
825                 if (u.uploading) {
826                         r = perform_upload(&u);
827                         if (r < 0)
828                                 break;
829                 }
830
831                 r = sd_event_run(u.events, u.timeout);
832                 if (r < 0) {
833                         log_error("Failed to run event loop: %s", strerror(-r));
834                         break;
835                 }
836         }
837
838 cleanup:
839         sd_notify(false,
840                   "STOPPING=1\n"
841                   "STATUS=Shutting down...");
842
843         destroy_uploader(&u);
844
845 finish:
846         return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
847 }