1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include <curl/curl.h>
25 #include "alloc-util.h"
26 #include "journal-upload.h"
32 * Write up to size bytes to buf. Return negative on error, and number of
33 * bytes written otherwise. The last case is a kind of an error too.
35 static ssize_t
write_entry(char *buf
, size_t size
, Uploader
*u
) {
39 assert(size
<= SSIZE_MAX
);
43 switch(u
->entry_state
) {
45 u
->current_cursor
= mfree(u
->current_cursor
);
47 r
= sd_journal_get_cursor(u
->journal
, &u
->current_cursor
);
49 return log_error_errno(r
, "Failed to get cursor: %m");
51 r
= snprintf(buf
+ pos
, size
- pos
,
52 "__CURSOR=%s\n", u
->current_cursor
);
54 /* not enough space */
59 if (pos
+ r
== size
) {
60 /* exactly one character short, but we don't need it */
68 case ENTRY_REALTIME
: {
71 r
= sd_journal_get_realtime_usec(u
->journal
, &realtime
);
73 return log_error_errno(r
, "Failed to get realtime timestamp: %m");
75 r
= snprintf(buf
+ pos
, size
- pos
,
76 "__REALTIME_TIMESTAMP="USEC_FMT
"\n", realtime
);
78 /* not enough space */
83 if (r
+ pos
== size
) {
84 /* exactly one character short, but we don't need it */
92 case ENTRY_MONOTONIC
: {
96 r
= sd_journal_get_monotonic_usec(u
->journal
, &monotonic
, &boot_id
);
98 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
100 r
= snprintf(buf
+ pos
, size
- pos
,
101 "__MONOTONIC_TIMESTAMP="USEC_FMT
"\n", monotonic
);
103 /* not enough space */
108 if (r
+ pos
== size
) {
109 /* exactly one character short, but we don't need it */
110 buf
[size
- 1] = '\n';
117 case ENTRY_BOOT_ID
: {
121 r
= sd_journal_get_monotonic_usec(u
->journal
, NULL
, &boot_id
);
123 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
125 r
= snprintf(buf
+ pos
, size
- pos
,
126 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id
, sid
));
128 /* not enough space */
133 if (r
+ pos
== size
) {
134 /* exactly one character short, but we don't need it */
135 buf
[size
- 1] = '\n';
142 case ENTRY_NEW_FIELD
: {
145 r
= sd_journal_enumerate_data(u
->journal
,
149 return log_error_errno(r
, "Failed to move to next field in entry: %m");
151 u
->entry_state
= ENTRY_OUTRO
;
155 if (!utf8_is_printable_newline(u
->field_data
,
156 u
->field_length
, false)) {
157 u
->entry_state
= ENTRY_BINARY_FIELD_START
;
164 case ENTRY_TEXT_FIELD
:
165 case ENTRY_BINARY_FIELD
: {
169 done
= size
- pos
> u
->field_length
- u
->field_pos
;
171 tocopy
= u
->field_length
- u
->field_pos
;
176 (char*) u
->field_data
+ u
->field_pos
,
180 buf
[pos
+ tocopy
] = '\n';
182 u
->entry_state
= ENTRY_NEW_FIELD
;
185 u
->field_pos
+= tocopy
;
190 case ENTRY_BINARY_FIELD_START
: {
194 c
= memchr(u
->field_data
, '=', u
->field_length
);
195 if (!c
|| c
== u
->field_data
) {
196 log_error("Invalid field.");
200 len
= c
- (const char*)u
->field_data
;
202 /* need space for label + '\n' */
203 if (size
- pos
< len
+ 1)
206 memcpy(buf
+ pos
, u
->field_data
, len
);
207 buf
[pos
+ len
] = '\n';
210 u
->field_pos
= len
+ 1;
214 case ENTRY_BINARY_FIELD_SIZE
: {
217 /* need space for uint64_t */
221 le64
= htole64(u
->field_length
- u
->field_pos
);
222 memcpy(buf
+ pos
, &le64
, 8);
230 /* need space for '\n' */
241 assert_not_reached("WTF?");
244 assert_not_reached("WTF?");
247 static size_t journal_input_callback(void *buf
, size_t size
, size_t nmemb
, void *userp
) {
255 assert(nmemb
<= SSIZE_MAX
/ size
);
259 while (j
&& filled
< size
* nmemb
) {
260 if (u
->entry_state
== ENTRY_DONE
) {
261 r
= sd_journal_next(j
);
263 log_error_errno(r
, "Failed to move to next entry in journal: %m");
264 return CURL_READFUNC_ABORT
;
267 log_debug("No more entries, waiting for journal.");
269 log_info("No more entries, closing journal.");
270 close_journal_input(u
);
273 u
->uploading
= false;
278 u
->entry_state
= ENTRY_CURSOR
;
281 w
= write_entry((char*)buf
+ filled
, size
* nmemb
- filled
, u
);
283 return CURL_READFUNC_ABORT
;
287 log_error("Buffer space is too small to write entry.");
288 return CURL_READFUNC_ABORT
;
289 } else if (u
->entry_state
!= ENTRY_DONE
)
290 /* This means that all available space was used up */
293 log_debug("Entry %zu (%s) has been uploaded.",
294 u
->entries_sent
, u
->current_cursor
);
300 void close_journal_input(Uploader
*u
) {
304 log_debug("Closing journal input.");
306 sd_journal_close(u
->journal
);
312 static int process_journal_input(Uploader
*u
, int skip
) {
315 r
= sd_journal_next_skip(u
->journal
, skip
);
317 return log_error_errno(r
, "Failed to skip to next entry: %m");
322 u
->entry_state
= ENTRY_CURSOR
;
323 return start_upload(u
, journal_input_callback
, u
);
326 int check_journal_input(Uploader
*u
) {
327 if (u
->input_event
) {
330 r
= sd_journal_process(u
->journal
);
332 log_error_errno(r
, "Failed to process journal: %m");
333 close_journal_input(u
);
337 if (r
== SD_JOURNAL_NOP
)
341 return process_journal_input(u
, 1);
344 static int dispatch_journal_input(sd_event_source
*event
,
353 log_warning("dispatch_journal_input called when uploading, ignoring.");
357 log_debug("Detected journal input, checking for new data.");
358 return check_journal_input(u
);
361 int open_journal_for_upload(Uploader
*u
,
370 sd_journal_set_data_threshold(j
, 0);
373 fd
= sd_journal_get_fd(j
);
375 return log_error_errno(fd
, "sd_journal_get_fd failed: %m");
377 events
= sd_journal_get_events(j
);
379 r
= sd_journal_reliable_fd(j
);
384 u
->timeout
= JOURNAL_UPLOAD_POLL_TIMEOUT
;
386 r
= sd_event_add_io(u
->events
, &u
->input_event
,
387 fd
, events
, dispatch_journal_input
, u
);
389 return log_error_errno(r
, "Failed to register input event: %m");
391 log_debug("Listening for journal events on fd:%d, timeout %d",
392 fd
, u
->timeout
== (uint64_t) -1 ? -1 : (int) u
->timeout
);
394 log_debug("Not listening for journal events.");
397 r
= sd_journal_seek_cursor(j
, cursor
);
399 return log_error_errno(r
, "Failed to seek to cursor %s: %m",
403 return process_journal_input(u
, 1 + !!after_cursor
);