8 #include "journal-upload.h"
11 * Write up to size bytes to buf. Return negative on error, and number of
12 * bytes written otherwise. The last case is a kind of an error too.
14 static ssize_t
write_entry(char *buf
, size_t size
, Uploader
*u
) {
18 assert(size
<= SSIZE_MAX
);
22 switch(u
->entry_state
) {
24 free(u
->current_cursor
);
25 u
->current_cursor
= NULL
;
27 r
= sd_journal_get_cursor(u
->journal
, &u
->current_cursor
);
29 log_error("Failed to get cursor: %s", strerror(-r
));
33 r
= snprintf(buf
+ pos
, size
- pos
,
34 "__CURSOR=%s\n", u
->current_cursor
);
36 /* not enough space */
41 if (pos
+ r
== size
) {
42 /* exactly one character short, but we don't need it */
50 case ENTRY_REALTIME
: {
53 r
= sd_journal_get_realtime_usec(u
->journal
, &realtime
);
55 log_error("Failed to get realtime timestamp: %s", strerror(-r
));
59 r
= snprintf(buf
+ pos
, size
- pos
,
60 "__REALTIME_TIMESTAMP="USEC_FMT
"\n", realtime
);
62 /* not enough space */
67 if (r
+ pos
== size
) {
68 /* exactly one character short, but we don't need it */
76 case ENTRY_MONOTONIC
: {
80 r
= sd_journal_get_monotonic_usec(u
->journal
, &monotonic
, &boot_id
);
82 log_error("Failed to get monotonic timestamp: %s", strerror(-r
));
86 r
= snprintf(buf
+ pos
, size
- pos
,
87 "__MONOTONIC_TIMESTAMP="USEC_FMT
"\n", monotonic
);
89 /* not enough space */
94 if (r
+ pos
== size
) {
95 /* exactly one character short, but we don't need it */
103 case ENTRY_BOOT_ID
: {
107 r
= sd_journal_get_monotonic_usec(u
->journal
, NULL
, &boot_id
);
109 log_error("Failed to get monotonic timestamp: %s", strerror(-r
));
113 r
= snprintf(buf
+ pos
, size
- pos
,
114 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id
, sid
));
116 /* not enough space */
121 if (r
+ pos
== size
) {
122 /* exactly one character short, but we don't need it */
123 buf
[size
- 1] = '\n';
130 case ENTRY_NEW_FIELD
: {
133 r
= sd_journal_enumerate_data(u
->journal
,
137 log_error("Failed to move to next field in entry: %s",
141 u
->entry_state
= ENTRY_OUTRO
;
145 if (!utf8_is_printable_newline(u
->field_data
,
146 u
->field_length
, false)) {
147 u
->entry_state
= ENTRY_BINARY_FIELD_START
;
154 case ENTRY_TEXT_FIELD
:
155 case ENTRY_BINARY_FIELD
: {
159 done
= size
- pos
> u
->field_length
- u
->field_pos
;
161 tocopy
= u
->field_length
- u
->field_pos
;
166 (char*) u
->field_data
+ u
->field_pos
,
170 buf
[pos
+ tocopy
] = '\n';
172 u
->entry_state
= ENTRY_NEW_FIELD
;
175 u
->field_pos
+= tocopy
;
180 case ENTRY_BINARY_FIELD_START
: {
184 c
= memchr(u
->field_data
, '=', u
->field_length
);
185 if (!c
|| c
== u
->field_data
) {
186 log_error("Invalid field.");
190 len
= c
- (const char*)u
->field_data
;
192 /* need space for label + '\n' */
193 if (size
- pos
< len
+ 1)
196 memcpy(buf
+ pos
, u
->field_data
, len
);
197 buf
[pos
+ len
] = '\n';
200 u
->field_pos
= len
+ 1;
204 case ENTRY_BINARY_FIELD_SIZE
: {
207 /* need space for uint64_t */
211 le64
= htole64(u
->field_length
- u
->field_pos
);
212 memcpy(buf
+ pos
, &le64
, 8);
220 /* need space for '\n' */
231 assert_not_reached("WTF?");
234 assert_not_reached("WTF?");
237 static size_t journal_input_callback(void *buf
, size_t size
, size_t nmemb
, void *userp
) {
245 assert(nmemb
<= SSIZE_MAX
/ size
);
249 while (j
&& filled
< size
* nmemb
) {
250 if (u
->entry_state
== ENTRY_DONE
) {
251 r
= sd_journal_next(j
);
253 log_error("Failed to move to next entry in journal: %s",
255 return CURL_READFUNC_ABORT
;
258 log_debug("No more entries, waiting for journal.");
260 log_info("No more entries, closing journal.");
261 close_journal_input(u
);
264 u
->uploading
= false;
269 u
->entry_state
= ENTRY_CURSOR
;
272 w
= write_entry((char*)buf
+ filled
, size
* nmemb
- filled
, u
);
274 return CURL_READFUNC_ABORT
;
278 log_error("Buffer space is too small to write entry.");
279 return CURL_READFUNC_ABORT
;
280 } else if (u
->entry_state
!= ENTRY_DONE
)
281 /* This means that all available space was used up */
284 log_debug("Entry %zu (%s) has been uploaded.",
285 u
->entries_sent
, u
->current_cursor
);
291 void close_journal_input(Uploader
*u
) {
295 log_debug("Closing journal input.");
297 sd_journal_close(u
->journal
);
303 static int process_journal_input(Uploader
*u
, int skip
) {
306 r
= sd_journal_next_skip(u
->journal
, skip
);
308 log_error("Failed to skip to next entry: %s", strerror(-r
));
314 u
->entry_state
= ENTRY_CURSOR
;
315 return start_upload(u
, journal_input_callback
, u
);
318 int check_journal_input(Uploader
*u
) {
319 if (u
->input_event
) {
322 r
= sd_journal_process(u
->journal
);
324 log_error("Failed to process journal: %s", strerror(-r
));
325 close_journal_input(u
);
329 if (r
== SD_JOURNAL_NOP
)
333 return process_journal_input(u
, 1);
336 static int dispatch_journal_input(sd_event_source
*event
,
345 log_warning("dispatch_journal_input called when uploading, ignoring.");
349 log_debug("Detected journal input, checking for new data.");
350 return check_journal_input(u
);
353 int open_journal_for_upload(Uploader
*u
,
362 sd_journal_set_data_threshold(j
, 0);
365 fd
= sd_journal_get_fd(j
);
367 log_error("sd_journal_get_fd failed: %s", strerror(-fd
));
371 events
= sd_journal_get_events(j
);
373 r
= sd_journal_reliable_fd(j
);
378 u
->timeout
= JOURNAL_UPLOAD_POLL_TIMEOUT
;
380 r
= sd_event_add_io(u
->events
, &u
->input_event
,
381 fd
, events
, dispatch_journal_input
, u
);
383 log_error("Failed to register input event: %s", strerror(-r
));
387 log_debug("Listening for journal events on fd:%d, timeout %d",
388 fd
, u
->timeout
== (uint64_t) -1 ? -1 : (int) u
->timeout
);
390 log_debug("Not listening for journal events.");
393 r
= sd_journal_seek_cursor(j
, cursor
);
395 log_error("Failed to seek to cursor %s: %s",
396 cursor
, strerror(-r
));
401 return process_journal_input(u
, 1 + !!after_cursor
);