1 /* SPDX-License-Identifier: LGPL-2.1+ */
8 #include "alloc-util.h"
9 #include "journal-upload.h"
11 #include "string-util.h"
16 * Write up to size bytes to buf. Return negative on error, and number of
17 * bytes written otherwise. The last case is a kind of an error too.
19 static ssize_t
write_entry(char *buf
, size_t size
, Uploader
*u
) {
23 assert(size
<= SSIZE_MAX
);
27 switch(u
->entry_state
) {
29 u
->current_cursor
= mfree(u
->current_cursor
);
31 r
= sd_journal_get_cursor(u
->journal
, &u
->current_cursor
);
33 return log_error_errno(r
, "Failed to get cursor: %m");
35 r
= snprintf(buf
+ pos
, size
- pos
,
36 "__CURSOR=%s\n", u
->current_cursor
);
38 if ((size_t) r
> size
- pos
)
39 /* not enough space */
44 if (pos
+ r
== size
) {
45 /* exactly one character short, but we don't need it */
53 case ENTRY_REALTIME
: {
56 r
= sd_journal_get_realtime_usec(u
->journal
, &realtime
);
58 return log_error_errno(r
, "Failed to get realtime timestamp: %m");
60 r
= snprintf(buf
+ pos
, size
- pos
,
61 "__REALTIME_TIMESTAMP="USEC_FMT
"\n", realtime
);
63 if ((size_t) r
> size
- pos
)
64 /* not enough space */
69 if (r
+ pos
== size
) {
70 /* exactly one character short, but we don't need it */
78 case ENTRY_MONOTONIC
: {
82 r
= sd_journal_get_monotonic_usec(u
->journal
, &monotonic
, &boot_id
);
84 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
86 r
= snprintf(buf
+ pos
, size
- pos
,
87 "__MONOTONIC_TIMESTAMP="USEC_FMT
"\n", monotonic
);
89 if ((size_t) r
> size
- pos
)
90 /* not enough space */
95 if (r
+ pos
== size
) {
96 /* exactly one character short, but we don't need it */
104 case ENTRY_BOOT_ID
: {
108 r
= sd_journal_get_monotonic_usec(u
->journal
, NULL
, &boot_id
);
110 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
112 r
= snprintf(buf
+ pos
, size
- pos
,
113 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id
, sid
));
115 if ((size_t) r
> size
- pos
)
116 /* not enough space */
121 if (r
+ pos
== size
) {
122 /* exactly one character short, but we don't need it */
123 buf
[size
- 1] = '\n';
130 case ENTRY_NEW_FIELD
: {
133 r
= sd_journal_enumerate_data(u
->journal
,
137 return log_error_errno(r
, "Failed to move to next field in entry: %m");
139 u
->entry_state
= ENTRY_OUTRO
;
143 /* We already printed the boot id from the data in
144 * the header, hence let's suppress it here */
145 if (memory_startswith(u
->field_data
, u
->field_length
, "_BOOT_ID="))
148 if (!utf8_is_printable_newline(u
->field_data
, u
->field_length
, false)) {
149 u
->entry_state
= ENTRY_BINARY_FIELD_START
;
156 case ENTRY_TEXT_FIELD
:
157 case ENTRY_BINARY_FIELD
: {
161 done
= size
- pos
> u
->field_length
- u
->field_pos
;
163 tocopy
= u
->field_length
- u
->field_pos
;
168 (char*) u
->field_data
+ u
->field_pos
,
172 buf
[pos
+ tocopy
] = '\n';
174 u
->entry_state
= ENTRY_NEW_FIELD
;
177 u
->field_pos
+= tocopy
;
182 case ENTRY_BINARY_FIELD_START
: {
186 c
= memchr(u
->field_data
, '=', u
->field_length
);
187 if (!c
|| c
== u
->field_data
)
188 return log_error_errno(SYNTHETIC_ERRNO(EINVAL
),
191 len
= c
- (const char*)u
->field_data
;
193 /* need space for label + '\n' */
194 if (size
- pos
< len
+ 1)
197 memcpy(buf
+ pos
, u
->field_data
, len
);
198 buf
[pos
+ len
] = '\n';
201 u
->field_pos
= len
+ 1;
205 case ENTRY_BINARY_FIELD_SIZE
: {
208 /* need space for uint64_t */
212 le64
= htole64(u
->field_length
- u
->field_pos
);
213 memcpy(buf
+ pos
, &le64
, 8);
221 /* need space for '\n' */
232 assert_not_reached("WTF?");
235 assert_not_reached("WTF?");
238 static inline void check_update_watchdog(Uploader
*u
) {
242 if (u
->watchdog_usec
<= 0)
245 after
= now(CLOCK_MONOTONIC
);
246 elapsed_time
= usec_sub_unsigned(after
, u
->watchdog_timestamp
);
247 if (elapsed_time
> u
->watchdog_usec
/ 2) {
248 log_debug("Update watchdog timer");
249 sd_notify(false, "WATCHDOG=1");
250 u
->watchdog_timestamp
= after
;
254 static size_t journal_input_callback(void *buf
, size_t size
, size_t nmemb
, void *userp
) {
262 assert(nmemb
<= SSIZE_MAX
/ size
);
264 check_update_watchdog(u
);
268 while (j
&& filled
< size
* nmemb
) {
269 if (u
->entry_state
== ENTRY_DONE
) {
270 r
= sd_journal_next(j
);
272 log_error_errno(r
, "Failed to move to next entry in journal: %m");
273 return CURL_READFUNC_ABORT
;
276 log_debug("No more entries, waiting for journal.");
278 log_info("No more entries, closing journal.");
279 close_journal_input(u
);
282 u
->uploading
= false;
287 u
->entry_state
= ENTRY_CURSOR
;
290 w
= write_entry((char*)buf
+ filled
, size
* nmemb
- filled
, u
);
292 return CURL_READFUNC_ABORT
;
296 log_error("Buffer space is too small to write entry.");
297 return CURL_READFUNC_ABORT
;
298 } else if (u
->entry_state
!= ENTRY_DONE
)
299 /* This means that all available space was used up */
302 log_debug("Entry %zu (%s) has been uploaded.",
303 u
->entries_sent
, u
->current_cursor
);
309 void close_journal_input(Uploader
*u
) {
313 log_debug("Closing journal input.");
315 sd_journal_close(u
->journal
);
321 static int process_journal_input(Uploader
*u
, int skip
) {
327 r
= sd_journal_next_skip(u
->journal
, skip
);
329 return log_error_errno(r
, "Failed to skip to next entry: %m");
334 u
->entry_state
= ENTRY_CURSOR
;
335 return start_upload(u
, journal_input_callback
, u
);
338 int check_journal_input(Uploader
*u
) {
339 if (u
->input_event
) {
342 r
= sd_journal_process(u
->journal
);
344 log_error_errno(r
, "Failed to process journal: %m");
345 close_journal_input(u
);
349 if (r
== SD_JOURNAL_NOP
)
353 return process_journal_input(u
, 1);
356 static int dispatch_journal_input(sd_event_source
*event
,
367 log_debug("Detected journal input, checking for new data.");
368 return check_journal_input(u
);
371 int open_journal_for_upload(Uploader
*u
,
380 sd_journal_set_data_threshold(j
, 0);
383 fd
= sd_journal_get_fd(j
);
385 return log_error_errno(fd
, "sd_journal_get_fd failed: %m");
387 events
= sd_journal_get_events(j
);
389 r
= sd_journal_reliable_fd(j
);
394 u
->timeout
= JOURNAL_UPLOAD_POLL_TIMEOUT
;
396 r
= sd_event_add_io(u
->events
, &u
->input_event
,
397 fd
, events
, dispatch_journal_input
, u
);
399 return log_error_errno(r
, "Failed to register input event: %m");
401 log_debug("Listening for journal events on fd:%d, timeout %d",
402 fd
, u
->timeout
== (uint64_t) -1 ? -1 : (int) u
->timeout
);
404 log_debug("Not listening for journal events.");
407 r
= sd_journal_seek_cursor(j
, cursor
);
409 return log_error_errno(r
, "Failed to seek to cursor %s: %m",
413 return process_journal_input(u
, 1 + !!after_cursor
);