]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload.c
journal-upload: verify state file can be saved before uploading
[thirdparty/systemd.git] / src / journal-remote / journal-upload.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <stdio.h>
23 #include <curl/curl.h>
24 #include <sys/stat.h>
25 #include <fcntl.h>
26 #include <getopt.h>
27
28 #include "sd-daemon.h"
29
30 #include "log.h"
31 #include "util.h"
32 #include "build.h"
33 #include "fileio.h"
34 #include "mkdir.h"
35 #include "conf-parser.h"
36 #include "journal-upload.h"
37
38 #define PRIV_KEY_FILE CERTIFICATE_ROOT "/private/journal-upload.pem"
39 #define CERT_FILE CERTIFICATE_ROOT "/certs/journal-upload.pem"
40 #define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem"
41 #define DEFAULT_PORT 19532
42
43 static const char* arg_url;
44
45 static void close_fd_input(Uploader *u);
46
47 static const char *arg_key = NULL;
48 static const char *arg_cert = NULL;
49 static const char *arg_trust = NULL;
50
51 static const char *arg_directory = NULL;
52 static char **arg_file = NULL;
53 static const char *arg_cursor = NULL;
54 static bool arg_after_cursor = false;
55 static int arg_journal_type = 0;
56 static const char *arg_machine = NULL;
57 static bool arg_merge = false;
58 static int arg_follow = -1;
59 static const char *arg_save_state = NULL;
60
61 #define SERVER_ANSWER_KEEP 2048
62
63 #define STATE_FILE "/var/lib/systemd/journal-upload/state"
64
65 #define easy_setopt(curl, opt, value, level, cmd) \
66 { \
67 code = curl_easy_setopt(curl, opt, value); \
68 if (code) { \
69 log_full(level, \
70 "curl_easy_setopt " #opt " failed: %s", \
71 curl_easy_strerror(code)); \
72 cmd; \
73 } \
74 }
75
76 static size_t output_callback(char *buf,
77 size_t size,
78 size_t nmemb,
79 void *userp) {
80 Uploader *u = userp;
81
82 assert(u);
83
84 log_debug("The server answers (%zu bytes): %.*s",
85 size*nmemb, (int)(size*nmemb), buf);
86
87 if (nmemb && !u->answer) {
88 u->answer = strndup(buf, size*nmemb);
89 if (!u->answer)
90 log_warning("Failed to store server answer (%zu bytes): %s",
91 size*nmemb, strerror(ENOMEM));
92 }
93
94 return size * nmemb;
95 }
96
97 static int check_cursor_updating(Uploader *u) {
98 _cleanup_free_ char *temp_path = NULL;
99 _cleanup_fclose_ FILE *f = NULL;
100 int r;
101
102 if (!u->state_file)
103 return 0;
104
105 r = mkdir_parents(u->state_file, 0755);
106 if (r < 0) {
107 log_error("Cannot create parent directory of state file %s: %s",
108 u->state_file, strerror(-r));
109 return r;
110 }
111
112 r = fopen_temporary(u->state_file, &f, &temp_path);
113 if (r < 0) {
114 log_error("Cannot save state to %s: %s",
115 u->state_file, strerror(-r));
116 return r;
117 }
118 unlink(temp_path);
119
120 return 0;
121 }
122
123 static int update_cursor_state(Uploader *u) {
124 _cleanup_free_ char *temp_path = NULL;
125 _cleanup_fclose_ FILE *f = NULL;
126 int r;
127
128 if (!u->state_file || !u->last_cursor)
129 return 0;
130
131 r = fopen_temporary(u->state_file, &f, &temp_path);
132 if (r < 0)
133 goto finish;
134
135 fprintf(f,
136 "# This is private data. Do not parse.\n"
137 "LAST_CURSOR=%s\n",
138 u->last_cursor);
139
140 fflush(f);
141
142 if (ferror(f) || rename(temp_path, u->state_file) < 0) {
143 r = -errno;
144 unlink(u->state_file);
145 unlink(temp_path);
146 }
147
148 finish:
149 if (r < 0)
150 log_error("Failed to save state %s: %s", u->state_file, strerror(-r));
151
152 return r;
153 }
154
155 static int load_cursor_state(Uploader *u) {
156 int r;
157
158 if (!u->state_file)
159 return 0;
160
161 r = parse_env_file(u->state_file, NEWLINE,
162 "LAST_CURSOR", &u->last_cursor,
163 NULL);
164
165 if (r < 0 && r != -ENOENT) {
166 log_error("Failed to read state file %s: %s",
167 u->state_file, strerror(-r));
168 return r;
169 }
170
171 return 0;
172 }
173
174
175
176 int start_upload(Uploader *u,
177 size_t (*input_callback)(void *ptr,
178 size_t size,
179 size_t nmemb,
180 void *userdata),
181 void *data) {
182 CURLcode code;
183
184 assert(u);
185 assert(input_callback);
186
187 if (!u->header) {
188 struct curl_slist *h;
189
190 h = curl_slist_append(NULL, "Content-Type: application/vnd.fdo.journal");
191 if (!h)
192 return log_oom();
193
194 h = curl_slist_append(h, "Transfer-Encoding: chunked");
195 if (!h) {
196 curl_slist_free_all(h);
197 return log_oom();
198 }
199
200 h = curl_slist_append(h, "Accept: text/plain");
201 if (!h) {
202 curl_slist_free_all(h);
203 return log_oom();
204 }
205
206 u->header = h;
207 }
208
209 if (!u->easy) {
210 CURL *curl;
211
212 curl = curl_easy_init();
213 if (!curl) {
214 log_error("Call to curl_easy_init failed.");
215 return -ENOSR;
216 }
217
218 /* tell it to POST to the URL */
219 easy_setopt(curl, CURLOPT_POST, 1L,
220 LOG_ERR, return -EXFULL);
221
222 easy_setopt(curl, CURLOPT_ERRORBUFFER, u->error,
223 LOG_ERR, return -EXFULL);
224
225 /* set where to write to */
226 easy_setopt(curl, CURLOPT_WRITEFUNCTION, output_callback,
227 LOG_ERR, return -EXFULL);
228
229 easy_setopt(curl, CURLOPT_WRITEDATA, data,
230 LOG_ERR, return -EXFULL);
231
232 /* set where to read from */
233 easy_setopt(curl, CURLOPT_READFUNCTION, input_callback,
234 LOG_ERR, return -EXFULL);
235
236 easy_setopt(curl, CURLOPT_READDATA, data,
237 LOG_ERR, return -EXFULL);
238
239 /* use our special own mime type and chunked transfer */
240 easy_setopt(curl, CURLOPT_HTTPHEADER, u->header,
241 LOG_ERR, return -EXFULL);
242
243 /* enable verbose for easier tracing */
244 easy_setopt(curl, CURLOPT_VERBOSE, 1L, LOG_WARNING, );
245
246 easy_setopt(curl, CURLOPT_USERAGENT,
247 "systemd-journal-upload " PACKAGE_STRING,
248 LOG_WARNING, );
249
250 if (arg_key || startswith(u->url, "https://")) {
251 easy_setopt(curl, CURLOPT_SSLKEY, arg_key ?: PRIV_KEY_FILE,
252 LOG_ERR, return -EXFULL);
253 easy_setopt(curl, CURLOPT_SSLCERT, arg_cert ?: CERT_FILE,
254 LOG_ERR, return -EXFULL);
255 }
256
257 if (arg_trust || startswith(u->url, "https://"))
258 easy_setopt(curl, CURLOPT_CAINFO, arg_trust ?: TRUST_FILE,
259 LOG_ERR, return -EXFULL);
260
261 if (arg_key || arg_trust)
262 easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1,
263 LOG_WARNING, );
264
265 u->easy = curl;
266 } else {
267 /* truncate the potential old error message */
268 u->error[0] = '\0';
269
270 free(u->answer);
271 u->answer = 0;
272 }
273
274 /* upload to this place */
275 code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url);
276 if (code) {
277 log_error("curl_easy_setopt CURLOPT_URL failed: %s",
278 curl_easy_strerror(code));
279 return -EXFULL;
280 }
281
282 u->uploading = true;
283
284 return 0;
285 }
286
287 static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
288 Uploader *u = userp;
289
290 ssize_t r;
291
292 assert(u);
293 assert(nmemb <= SSIZE_MAX / size);
294
295 if (u->input < 0)
296 return 0;
297
298 r = read(u->input, buf, size * nmemb);
299 log_debug("%s: allowed %zu, read %zu", __func__, size*nmemb, r);
300
301 if (r > 0)
302 return r;
303
304 u->uploading = false;
305 if (r == 0) {
306 log_debug("Reached EOF");
307 close_fd_input(u);
308 return 0;
309 } else {
310 log_error("Aborting transfer after read error on input: %m.");
311 return CURL_READFUNC_ABORT;
312 }
313 }
314
315 static void close_fd_input(Uploader *u) {
316 assert(u);
317
318 if (u->input >= 0)
319 close_nointr(u->input);
320 u->input = -1;
321 u->timeout = 0;
322 }
323
324 static int dispatch_fd_input(sd_event_source *event,
325 int fd,
326 uint32_t revents,
327 void *userp) {
328 Uploader *u = userp;
329
330 assert(u);
331 assert(fd >= 0);
332
333 if (revents & EPOLLHUP) {
334 log_debug("Received HUP");
335 close_fd_input(u);
336 return 0;
337 }
338
339 if (!(revents & EPOLLIN)) {
340 log_warning("Unexpected poll event %"PRIu32".", revents);
341 return -EINVAL;
342 }
343
344 if (u->uploading) {
345 log_warning("dispatch_fd_input called when uploading, ignoring.");
346 return 0;
347 }
348
349 return start_upload(u, fd_input_callback, u);
350 }
351
352 static int open_file_for_upload(Uploader *u, const char *filename) {
353 int fd, r = 0;
354
355 if (streq(filename, "-"))
356 fd = STDIN_FILENO;
357 else {
358 fd = open(filename, O_RDONLY|O_CLOEXEC|O_NOCTTY);
359 if (fd < 0) {
360 log_error("Failed to open %s: %m", filename);
361 return -errno;
362 }
363 }
364
365 u->input = fd;
366
367 if (arg_follow) {
368 r = sd_event_add_io(u->events, &u->input_event,
369 fd, EPOLLIN, dispatch_fd_input, u);
370 if (r < 0) {
371 if (r != -EPERM || arg_follow > 0) {
372 log_error("Failed to register input event: %s", strerror(-r));
373 return r;
374 }
375
376 /* Normal files should just be consumed without polling. */
377 r = start_upload(u, fd_input_callback, u);
378 }
379 }
380
381 return r;
382 }
383
384 static int dispatch_sigterm(sd_event_source *event,
385 const struct signalfd_siginfo *si,
386 void *userdata) {
387 Uploader *u = userdata;
388
389 assert(u);
390
391 log_received_signal(LOG_INFO, si);
392
393 close_fd_input(u);
394 close_journal_input(u);
395
396 sd_event_exit(u->events, 0);
397 return 0;
398 }
399
400 static int setup_signals(Uploader *u) {
401 sigset_t mask;
402 int r;
403
404 assert(u);
405
406 assert_se(sigemptyset(&mask) == 0);
407 sigset_add_many(&mask, SIGINT, SIGTERM, -1);
408 assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);
409
410 r = sd_event_add_signal(u->events, &u->sigterm_event, SIGTERM, dispatch_sigterm, u);
411 if (r < 0)
412 return r;
413
414 r = sd_event_add_signal(u->events, &u->sigint_event, SIGINT, dispatch_sigterm, u);
415 if (r < 0)
416 return r;
417
418 return 0;
419 }
420
421 static int setup_uploader(Uploader *u, const char *url, const char *state_file) {
422 int r;
423 const char *host, *proto = "";
424
425 assert(u);
426 assert(url);
427
428 memzero(u, sizeof(Uploader));
429 u->input = -1;
430
431 if (!(host = startswith(url, "http://")) && !(host = startswith(url, "https://"))) {
432 host = url;
433 proto = "https://";
434 }
435
436 if (strchr(host, ':'))
437 u->url = strjoin(proto, url, "/upload", NULL);
438 else {
439 char *t;
440 size_t x;
441
442 t = strdupa(url);
443 x = strlen(t);
444 while (x > 0 && t[x - 1] == '/')
445 t[x - 1] = '\0';
446
447 u->url = strjoin(proto, t, ":" STRINGIFY(DEFAULT_PORT), "/upload", NULL);
448 }
449 if (!u->url)
450 return log_oom();
451
452 u->state_file = state_file;
453
454 r = sd_event_default(&u->events);
455 if (r < 0) {
456 log_error("sd_event_default failed: %s", strerror(-r));
457 return r;
458 }
459
460 r = setup_signals(u);
461 if (r < 0) {
462 log_error("Failed to set up signals: %s", strerror(-r));
463 return r;
464 }
465
466 return load_cursor_state(u);
467 }
468
469 static void destroy_uploader(Uploader *u) {
470 assert(u);
471
472 curl_easy_cleanup(u->easy);
473 curl_slist_free_all(u->header);
474 free(u->answer);
475
476 free(u->last_cursor);
477 free(u->current_cursor);
478
479 free(u->url);
480
481 u->input_event = sd_event_source_unref(u->input_event);
482
483 close_fd_input(u);
484 close_journal_input(u);
485
486 sd_event_source_unref(u->sigterm_event);
487 sd_event_source_unref(u->sigint_event);
488 sd_event_unref(u->events);
489 }
490
491 static int perform_upload(Uploader *u) {
492 CURLcode code;
493 long status;
494
495 assert(u);
496
497 code = curl_easy_perform(u->easy);
498 if (code) {
499 log_error("Upload to %s failed: %.*s",
500 u->url,
501 u->error[0] ? (int) sizeof(u->error) : INT_MAX,
502 u->error[0] ? u->error : curl_easy_strerror(code));
503 return -EIO;
504 }
505
506 code = curl_easy_getinfo(u->easy, CURLINFO_RESPONSE_CODE, &status);
507 if (code) {
508 log_error("Failed to retrieve response code: %s",
509 curl_easy_strerror(code));
510 return -EUCLEAN;
511 }
512
513 if (status >= 300) {
514 log_error("Upload to %s failed with code %lu: %s",
515 u->url, status, strna(u->answer));
516 return -EIO;
517 } else if (status < 200) {
518 log_error("Upload to %s finished with unexpected code %lu: %s",
519 u->url, status, strna(u->answer));
520 return -EIO;
521 } else
522 log_debug("Upload finished successfully with code %lu: %s",
523 status, strna(u->answer));
524
525 free(u->last_cursor);
526 u->last_cursor = u->current_cursor;
527 u->current_cursor = NULL;
528
529 return update_cursor_state(u);
530 }
531
532 static int parse_config(void) {
533 const ConfigTableItem items[] = {
534 { "Upload", "URL", config_parse_string, 0, &arg_url },
535 { "Upload", "ServerKeyFile", config_parse_path, 0, &arg_key },
536 { "Upload", "ServerCertificateFile", config_parse_path, 0, &arg_cert },
537 { "Upload", "TrustedCertificateFile", config_parse_path, 0, &arg_trust },
538 {}};
539
540 return config_parse(NULL, PKGSYSCONFDIR "/journal-upload.conf", NULL,
541 "Upload\0",
542 config_item_table_lookup, items,
543 false, false, true, NULL);
544 }
545
546 static void help(void) {
547 printf("%s -u URL {FILE|-}...\n\n"
548 "Upload journal events to a remote server.\n\n"
549 " -h --help Show this help\n"
550 " --version Show package version\n"
551 " -u --url=URL Upload to this address (default port "
552 STRINGIFY(DEFAULT_PORT) ")\n"
553 " --key=FILENAME Specify key in PEM format (default:\n"
554 " \"" PRIV_KEY_FILE "\")\n"
555 " --cert=FILENAME Specify certificate in PEM format (default:\n"
556 " \"" CERT_FILE "\")\n"
557 " --trust=FILENAME|all Specify CA certificate or disable checking (default:\n"
558 " \"" TRUST_FILE "\")\n"
559 " --system Use the system journal\n"
560 " --user Use the user journal for the current user\n"
561 " -m --merge Use all available journals\n"
562 " -M --machine=CONTAINER Operate on local container\n"
563 " -D --directory=PATH Use journal files from directory\n"
564 " --file=PATH Use this journal file\n"
565 " --cursor=CURSOR Start at the specified cursor\n"
566 " --after-cursor=CURSOR Start after the specified cursor\n"
567 " --follow[=BOOL] Do [not] wait for input\n"
568 " --save-state[=FILE] Save uploaded cursors (default \n"
569 " " STATE_FILE ")\n"
570 " -h --help Show this help and exit\n"
571 " --version Print version string and exit\n"
572 , program_invocation_short_name);
573 }
574
575 static int parse_argv(int argc, char *argv[]) {
576 enum {
577 ARG_VERSION = 0x100,
578 ARG_KEY,
579 ARG_CERT,
580 ARG_TRUST,
581 ARG_USER,
582 ARG_SYSTEM,
583 ARG_FILE,
584 ARG_CURSOR,
585 ARG_AFTER_CURSOR,
586 ARG_FOLLOW,
587 ARG_SAVE_STATE,
588 };
589
590 static const struct option options[] = {
591 { "help", no_argument, NULL, 'h' },
592 { "version", no_argument, NULL, ARG_VERSION },
593 { "url", required_argument, NULL, 'u' },
594 { "key", required_argument, NULL, ARG_KEY },
595 { "cert", required_argument, NULL, ARG_CERT },
596 { "trust", required_argument, NULL, ARG_TRUST },
597 { "system", no_argument, NULL, ARG_SYSTEM },
598 { "user", no_argument, NULL, ARG_USER },
599 { "merge", no_argument, NULL, 'm' },
600 { "machine", required_argument, NULL, 'M' },
601 { "directory", required_argument, NULL, 'D' },
602 { "file", required_argument, NULL, ARG_FILE },
603 { "cursor", required_argument, NULL, ARG_CURSOR },
604 { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR },
605 { "follow", optional_argument, NULL, ARG_FOLLOW },
606 { "save-state", optional_argument, NULL, ARG_SAVE_STATE },
607 {}
608 };
609
610 int c, r;
611
612 assert(argc >= 0);
613 assert(argv);
614
615 opterr = 0;
616
617 while ((c = getopt_long(argc, argv, "hu:mM:D:", options, NULL)) >= 0)
618 switch(c) {
619 case 'h':
620 help();
621 return 0 /* done */;
622
623 case ARG_VERSION:
624 puts(PACKAGE_STRING);
625 puts(SYSTEMD_FEATURES);
626 return 0 /* done */;
627
628 case 'u':
629 if (arg_url) {
630 log_error("cannot use more than one --url");
631 return -EINVAL;
632 }
633
634 arg_url = optarg;
635 break;
636
637 case ARG_KEY:
638 if (arg_key) {
639 log_error("cannot use more than one --key");
640 return -EINVAL;
641 }
642
643 arg_key = optarg;
644 break;
645
646 case ARG_CERT:
647 if (arg_cert) {
648 log_error("cannot use more than one --cert");
649 return -EINVAL;
650 }
651
652 arg_cert = optarg;
653 break;
654
655 case ARG_TRUST:
656 if (arg_trust) {
657 log_error("cannot use more than one --trust");
658 return -EINVAL;
659 }
660
661 arg_trust = optarg;
662 break;
663
664 case ARG_SYSTEM:
665 arg_journal_type |= SD_JOURNAL_SYSTEM;
666 break;
667
668 case ARG_USER:
669 arg_journal_type |= SD_JOURNAL_CURRENT_USER;
670 break;
671
672 case 'm':
673 arg_merge = true;
674 break;
675
676 case 'M':
677 if (arg_machine) {
678 log_error("cannot use more than one --machine/-M");
679 return -EINVAL;
680 }
681
682 arg_machine = optarg;
683 break;
684
685 case 'D':
686 if (arg_directory) {
687 log_error("cannot use more than one --directory/-D");
688 return -EINVAL;
689 }
690
691 arg_directory = optarg;
692 break;
693
694 case ARG_FILE:
695 r = glob_extend(&arg_file, optarg);
696 if (r < 0) {
697 log_error("Failed to add paths: %s", strerror(-r));
698 return r;
699 };
700 break;
701
702 case ARG_CURSOR:
703 if (arg_cursor) {
704 log_error("cannot use more than one --cursor/--after-cursor");
705 return -EINVAL;
706 }
707
708 arg_cursor = optarg;
709 break;
710
711 case ARG_AFTER_CURSOR:
712 if (arg_cursor) {
713 log_error("cannot use more than one --cursor/--after-cursor");
714 return -EINVAL;
715 }
716
717 arg_cursor = optarg;
718 arg_after_cursor = true;
719 break;
720
721 case ARG_FOLLOW:
722 if (optarg) {
723 r = parse_boolean(optarg);
724 if (r < 0) {
725 log_error("Failed to parse --follow= parameter.");
726 return -EINVAL;
727 }
728
729 arg_follow = !!r;
730 } else
731 arg_follow = true;
732
733 break;
734
735 case ARG_SAVE_STATE:
736 arg_save_state = optarg ?: STATE_FILE;
737 break;
738
739 case '?':
740 log_error("Unknown option %s.", argv[optind-1]);
741 return -EINVAL;
742
743 case ':':
744 log_error("Missing argument to %s.", argv[optind-1]);
745 return -EINVAL;
746
747 default:
748 assert_not_reached("Unhandled option code.");
749 }
750
751 if (!arg_url) {
752 log_error("Required --url/-u option missing.");
753 return -EINVAL;
754 }
755
756 if (!!arg_key != !!arg_cert) {
757 log_error("Options --key and --cert must be used together.");
758 return -EINVAL;
759 }
760
761 if (optind < argc && (arg_directory || arg_file || arg_machine || arg_journal_type)) {
762 log_error("Input arguments make no sense with journal input.");
763 return -EINVAL;
764 }
765
766 return 1;
767 }
768
769 static int open_journal(sd_journal **j) {
770 int r;
771
772 if (arg_directory)
773 r = sd_journal_open_directory(j, arg_directory, arg_journal_type);
774 else if (arg_file)
775 r = sd_journal_open_files(j, (const char**) arg_file, 0);
776 else if (arg_machine)
777 r = sd_journal_open_container(j, arg_machine, 0);
778 else
779 r = sd_journal_open(j, !arg_merge*SD_JOURNAL_LOCAL_ONLY + arg_journal_type);
780 if (r < 0)
781 log_error("Failed to open %s: %s",
782 arg_directory ? arg_directory : arg_file ? "files" : "journal",
783 strerror(-r));
784 return r;
785 }
786
787 int main(int argc, char **argv) {
788 Uploader u;
789 int r;
790 bool use_journal;
791
792 log_show_color(true);
793 log_parse_environment();
794
795 r = parse_config();
796 if (r < 0)
797 goto finish;
798
799 r = parse_argv(argc, argv);
800 if (r <= 0)
801 goto finish;
802
803 r = setup_uploader(&u, arg_url, arg_save_state);
804 if (r < 0)
805 goto cleanup;
806
807 sd_event_set_watchdog(u.events, true);
808
809 r = check_cursor_updating(&u);
810 if (r < 0)
811 goto cleanup;
812
813 log_debug("%s running as pid "PID_FMT,
814 program_invocation_short_name, getpid());
815
816 use_journal = optind >= argc;
817 if (use_journal) {
818 sd_journal *j;
819 r = open_journal(&j);
820 if (r < 0)
821 goto finish;
822 r = open_journal_for_upload(&u, j,
823 arg_cursor ?: u.last_cursor,
824 arg_cursor ? arg_after_cursor : true,
825 !!arg_follow);
826 if (r < 0)
827 goto finish;
828 }
829
830 sd_notify(false,
831 "READY=1\n"
832 "STATUS=Processing input...");
833
834 while (true) {
835 if (use_journal) {
836 if (!u.journal)
837 break;
838
839 r = check_journal_input(&u);
840 } else if (u.input < 0 && !use_journal) {
841 if (optind >= argc)
842 break;
843
844 log_debug("Using %s as input.", argv[optind]);
845 r = open_file_for_upload(&u, argv[optind++]);
846 }
847 if (r < 0)
848 goto cleanup;
849
850 r = sd_event_get_state(u.events);
851 if (r < 0)
852 break;
853 if (r == SD_EVENT_FINISHED)
854 break;
855
856 if (u.uploading) {
857 r = perform_upload(&u);
858 if (r < 0)
859 break;
860 }
861
862 r = sd_event_run(u.events, u.timeout);
863 if (r < 0) {
864 log_error("Failed to run event loop: %s", strerror(-r));
865 break;
866 }
867 }
868
869 cleanup:
870 sd_notify(false,
871 "STOPPING=1\n"
872 "STATUS=Shutting down...");
873
874 destroy_uploader(&u);
875
876 finish:
877 return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE;
878 }