1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
24 #include <curl/curl.h>
26 #include "alloc-util.h"
27 #include "journal-upload.h"
33 * Write up to size bytes to buf. Return negative on error, and number of
34 * bytes written otherwise. The last case is a kind of an error too.
36 static ssize_t
write_entry(char *buf
, size_t size
, Uploader
*u
) {
40 assert(size
<= SSIZE_MAX
);
44 switch(u
->entry_state
) {
46 u
->current_cursor
= mfree(u
->current_cursor
);
48 r
= sd_journal_get_cursor(u
->journal
, &u
->current_cursor
);
50 return log_error_errno(r
, "Failed to get cursor: %m");
52 r
= snprintf(buf
+ pos
, size
- pos
,
53 "__CURSOR=%s\n", u
->current_cursor
);
55 /* not enough space */
60 if (pos
+ r
== size
) {
61 /* exactly one character short, but we don't need it */
69 case ENTRY_REALTIME
: {
72 r
= sd_journal_get_realtime_usec(u
->journal
, &realtime
);
74 return log_error_errno(r
, "Failed to get realtime timestamp: %m");
76 r
= snprintf(buf
+ pos
, size
- pos
,
77 "__REALTIME_TIMESTAMP="USEC_FMT
"\n", realtime
);
79 /* not enough space */
84 if (r
+ pos
== size
) {
85 /* exactly one character short, but we don't need it */
93 case ENTRY_MONOTONIC
: {
97 r
= sd_journal_get_monotonic_usec(u
->journal
, &monotonic
, &boot_id
);
99 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
101 r
= snprintf(buf
+ pos
, size
- pos
,
102 "__MONOTONIC_TIMESTAMP="USEC_FMT
"\n", monotonic
);
104 /* not enough space */
109 if (r
+ pos
== size
) {
110 /* exactly one character short, but we don't need it */
111 buf
[size
- 1] = '\n';
118 case ENTRY_BOOT_ID
: {
122 r
= sd_journal_get_monotonic_usec(u
->journal
, NULL
, &boot_id
);
124 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
126 r
= snprintf(buf
+ pos
, size
- pos
,
127 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id
, sid
));
129 /* not enough space */
134 if (r
+ pos
== size
) {
135 /* exactly one character short, but we don't need it */
136 buf
[size
- 1] = '\n';
143 case ENTRY_NEW_FIELD
: {
146 r
= sd_journal_enumerate_data(u
->journal
,
150 return log_error_errno(r
, "Failed to move to next field in entry: %m");
152 u
->entry_state
= ENTRY_OUTRO
;
156 if (!utf8_is_printable_newline(u
->field_data
,
157 u
->field_length
, false)) {
158 u
->entry_state
= ENTRY_BINARY_FIELD_START
;
165 case ENTRY_TEXT_FIELD
:
166 case ENTRY_BINARY_FIELD
: {
170 done
= size
- pos
> u
->field_length
- u
->field_pos
;
172 tocopy
= u
->field_length
- u
->field_pos
;
177 (char*) u
->field_data
+ u
->field_pos
,
181 buf
[pos
+ tocopy
] = '\n';
183 u
->entry_state
= ENTRY_NEW_FIELD
;
186 u
->field_pos
+= tocopy
;
191 case ENTRY_BINARY_FIELD_START
: {
195 c
= memchr(u
->field_data
, '=', u
->field_length
);
196 if (!c
|| c
== u
->field_data
) {
197 log_error("Invalid field.");
201 len
= c
- (const char*)u
->field_data
;
203 /* need space for label + '\n' */
204 if (size
- pos
< len
+ 1)
207 memcpy(buf
+ pos
, u
->field_data
, len
);
208 buf
[pos
+ len
] = '\n';
211 u
->field_pos
= len
+ 1;
215 case ENTRY_BINARY_FIELD_SIZE
: {
218 /* need space for uint64_t */
222 le64
= htole64(u
->field_length
- u
->field_pos
);
223 memcpy(buf
+ pos
, &le64
, 8);
231 /* need space for '\n' */
242 assert_not_reached("WTF?");
245 assert_not_reached("WTF?");
248 static size_t journal_input_callback(void *buf
, size_t size
, size_t nmemb
, void *userp
) {
256 assert(nmemb
<= SSIZE_MAX
/ size
);
260 while (j
&& filled
< size
* nmemb
) {
261 if (u
->entry_state
== ENTRY_DONE
) {
262 r
= sd_journal_next(j
);
264 log_error_errno(r
, "Failed to move to next entry in journal: %m");
265 return CURL_READFUNC_ABORT
;
268 log_debug("No more entries, waiting for journal.");
270 log_info("No more entries, closing journal.");
271 close_journal_input(u
);
274 u
->uploading
= false;
279 u
->entry_state
= ENTRY_CURSOR
;
282 w
= write_entry((char*)buf
+ filled
, size
* nmemb
- filled
, u
);
284 return CURL_READFUNC_ABORT
;
288 log_error("Buffer space is too small to write entry.");
289 return CURL_READFUNC_ABORT
;
290 } else if (u
->entry_state
!= ENTRY_DONE
)
291 /* This means that all available space was used up */
294 log_debug("Entry %zu (%s) has been uploaded.",
295 u
->entries_sent
, u
->current_cursor
);
301 void close_journal_input(Uploader
*u
) {
305 log_debug("Closing journal input.");
307 sd_journal_close(u
->journal
);
313 static int process_journal_input(Uploader
*u
, int skip
) {
316 r
= sd_journal_next_skip(u
->journal
, skip
);
318 return log_error_errno(r
, "Failed to skip to next entry: %m");
323 u
->entry_state
= ENTRY_CURSOR
;
324 return start_upload(u
, journal_input_callback
, u
);
327 int check_journal_input(Uploader
*u
) {
328 if (u
->input_event
) {
331 r
= sd_journal_process(u
->journal
);
333 log_error_errno(r
, "Failed to process journal: %m");
334 close_journal_input(u
);
338 if (r
== SD_JOURNAL_NOP
)
342 return process_journal_input(u
, 1);
345 static int dispatch_journal_input(sd_event_source
*event
,
354 log_warning("dispatch_journal_input called when uploading, ignoring.");
358 log_debug("Detected journal input, checking for new data.");
359 return check_journal_input(u
);
362 int open_journal_for_upload(Uploader
*u
,
371 sd_journal_set_data_threshold(j
, 0);
374 fd
= sd_journal_get_fd(j
);
376 return log_error_errno(fd
, "sd_journal_get_fd failed: %m");
378 events
= sd_journal_get_events(j
);
380 r
= sd_journal_reliable_fd(j
);
385 u
->timeout
= JOURNAL_UPLOAD_POLL_TIMEOUT
;
387 r
= sd_event_add_io(u
->events
, &u
->input_event
,
388 fd
, events
, dispatch_journal_input
, u
);
390 return log_error_errno(r
, "Failed to register input event: %m");
392 log_debug("Listening for journal events on fd:%d, timeout %d",
393 fd
, u
->timeout
== (uint64_t) -1 ? -1 : (int) u
->timeout
);
395 log_debug("Not listening for journal events.");
398 r
= sd_journal_seek_cursor(j
, cursor
);
400 return log_error_errno(r
, "Failed to seek to cursor %s: %m",
404 return process_journal_input(u
, 1 + !!after_cursor
);