1 /* SPDX-License-Identifier: LGPL-2.1+ */
3 This file is part of systemd.
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7 systemd is free software; you can redistribute it and/or modify it
8 under the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 2.1 of the License, or
10 (at your option) any later version.
12 systemd is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with systemd; If not, see <http://www.gnu.org/licenses/>.
21 #include <curl/curl.h>
24 #include "alloc-util.h"
25 #include "journal-upload.h"
29 #include "sd-daemon.h"
32 * Write up to size bytes to buf. Return negative on error, and number of
33 * bytes written otherwise. The last case is a kind of an error too.
35 static ssize_t
write_entry(char *buf
, size_t size
, Uploader
*u
) {
39 assert(size
<= SSIZE_MAX
);
43 switch(u
->entry_state
) {
45 u
->current_cursor
= mfree(u
->current_cursor
);
47 r
= sd_journal_get_cursor(u
->journal
, &u
->current_cursor
);
49 return log_error_errno(r
, "Failed to get cursor: %m");
51 r
= snprintf(buf
+ pos
, size
- pos
,
52 "__CURSOR=%s\n", u
->current_cursor
);
54 /* not enough space */
59 if (pos
+ r
== size
) {
60 /* exactly one character short, but we don't need it */
68 case ENTRY_REALTIME
: {
71 r
= sd_journal_get_realtime_usec(u
->journal
, &realtime
);
73 return log_error_errno(r
, "Failed to get realtime timestamp: %m");
75 r
= snprintf(buf
+ pos
, size
- pos
,
76 "__REALTIME_TIMESTAMP="USEC_FMT
"\n", realtime
);
78 /* not enough space */
83 if (r
+ pos
== size
) {
84 /* exactly one character short, but we don't need it */
92 case ENTRY_MONOTONIC
: {
96 r
= sd_journal_get_monotonic_usec(u
->journal
, &monotonic
, &boot_id
);
98 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
100 r
= snprintf(buf
+ pos
, size
- pos
,
101 "__MONOTONIC_TIMESTAMP="USEC_FMT
"\n", monotonic
);
103 /* not enough space */
108 if (r
+ pos
== size
) {
109 /* exactly one character short, but we don't need it */
110 buf
[size
- 1] = '\n';
117 case ENTRY_BOOT_ID
: {
121 r
= sd_journal_get_monotonic_usec(u
->journal
, NULL
, &boot_id
);
123 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
125 r
= snprintf(buf
+ pos
, size
- pos
,
126 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id
, sid
));
128 /* not enough space */
133 if (r
+ pos
== size
) {
134 /* exactly one character short, but we don't need it */
135 buf
[size
- 1] = '\n';
142 case ENTRY_NEW_FIELD
: {
145 r
= sd_journal_enumerate_data(u
->journal
,
149 return log_error_errno(r
, "Failed to move to next field in entry: %m");
151 u
->entry_state
= ENTRY_OUTRO
;
155 if (!utf8_is_printable_newline(u
->field_data
,
156 u
->field_length
, false)) {
157 u
->entry_state
= ENTRY_BINARY_FIELD_START
;
164 case ENTRY_TEXT_FIELD
:
165 case ENTRY_BINARY_FIELD
: {
169 done
= size
- pos
> u
->field_length
- u
->field_pos
;
171 tocopy
= u
->field_length
- u
->field_pos
;
176 (char*) u
->field_data
+ u
->field_pos
,
180 buf
[pos
+ tocopy
] = '\n';
182 u
->entry_state
= ENTRY_NEW_FIELD
;
185 u
->field_pos
+= tocopy
;
190 case ENTRY_BINARY_FIELD_START
: {
194 c
= memchr(u
->field_data
, '=', u
->field_length
);
195 if (!c
|| c
== u
->field_data
) {
196 log_error("Invalid field.");
200 len
= c
- (const char*)u
->field_data
;
202 /* need space for label + '\n' */
203 if (size
- pos
< len
+ 1)
206 memcpy(buf
+ pos
, u
->field_data
, len
);
207 buf
[pos
+ len
] = '\n';
210 u
->field_pos
= len
+ 1;
214 case ENTRY_BINARY_FIELD_SIZE
: {
217 /* need space for uint64_t */
221 le64
= htole64(u
->field_length
- u
->field_pos
);
222 memcpy(buf
+ pos
, &le64
, 8);
230 /* need space for '\n' */
241 assert_not_reached("WTF?");
244 assert_not_reached("WTF?");
247 static inline void check_update_watchdog(Uploader
*u
) {
251 if (u
->watchdog_usec
<= 0)
254 after
= now(CLOCK_MONOTONIC
);
255 elapsed_time
= usec_sub_unsigned(after
, u
->watchdog_timestamp
);
256 if (elapsed_time
> u
->watchdog_usec
/ 2) {
257 log_debug("Update watchdog timer");
258 sd_notify(false, "WATCHDOG=1");
259 u
->watchdog_timestamp
= after
;
263 static size_t journal_input_callback(void *buf
, size_t size
, size_t nmemb
, void *userp
) {
271 assert(nmemb
<= SSIZE_MAX
/ size
);
273 check_update_watchdog(u
);
277 while (j
&& filled
< size
* nmemb
) {
278 if (u
->entry_state
== ENTRY_DONE
) {
279 r
= sd_journal_next(j
);
281 log_error_errno(r
, "Failed to move to next entry in journal: %m");
282 return CURL_READFUNC_ABORT
;
285 log_debug("No more entries, waiting for journal.");
287 log_info("No more entries, closing journal.");
288 close_journal_input(u
);
291 u
->uploading
= false;
296 u
->entry_state
= ENTRY_CURSOR
;
299 w
= write_entry((char*)buf
+ filled
, size
* nmemb
- filled
, u
);
301 return CURL_READFUNC_ABORT
;
305 log_error("Buffer space is too small to write entry.");
306 return CURL_READFUNC_ABORT
;
307 } else if (u
->entry_state
!= ENTRY_DONE
)
308 /* This means that all available space was used up */
311 log_debug("Entry %zu (%s) has been uploaded.",
312 u
->entries_sent
, u
->current_cursor
);
318 void close_journal_input(Uploader
*u
) {
322 log_debug("Closing journal input.");
324 sd_journal_close(u
->journal
);
330 static int process_journal_input(Uploader
*u
, int skip
) {
336 r
= sd_journal_next_skip(u
->journal
, skip
);
338 return log_error_errno(r
, "Failed to skip to next entry: %m");
343 u
->entry_state
= ENTRY_CURSOR
;
344 return start_upload(u
, journal_input_callback
, u
);
347 int check_journal_input(Uploader
*u
) {
348 if (u
->input_event
) {
351 r
= sd_journal_process(u
->journal
);
353 log_error_errno(r
, "Failed to process journal: %m");
354 close_journal_input(u
);
358 if (r
== SD_JOURNAL_NOP
)
362 return process_journal_input(u
, 1);
365 static int dispatch_journal_input(sd_event_source
*event
,
376 log_debug("Detected journal input, checking for new data.");
377 return check_journal_input(u
);
380 int open_journal_for_upload(Uploader
*u
,
389 sd_journal_set_data_threshold(j
, 0);
392 fd
= sd_journal_get_fd(j
);
394 return log_error_errno(fd
, "sd_journal_get_fd failed: %m");
396 events
= sd_journal_get_events(j
);
398 r
= sd_journal_reliable_fd(j
);
403 u
->timeout
= JOURNAL_UPLOAD_POLL_TIMEOUT
;
405 r
= sd_event_add_io(u
->events
, &u
->input_event
,
406 fd
, events
, dispatch_journal_input
, u
);
408 return log_error_errno(r
, "Failed to register input event: %m");
410 log_debug("Listening for journal events on fd:%d, timeout %d",
411 fd
, u
->timeout
== (uint64_t) -1 ? -1 : (int) u
->timeout
);
413 log_debug("Not listening for journal events.");
416 r
= sd_journal_seek_cursor(j
, cursor
);
418 return log_error_errno(r
, "Failed to seek to cursor %s: %m",
422 return process_journal_input(u
, 1 + !!after_cursor
);