2 This file is part of systemd.
4 Copyright 2014 Zbigniew Jędrzejewski-Szmek
6 systemd is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 systemd is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 #include <curl/curl.h>
23 #include "alloc-util.h"
24 #include "journal-upload.h"
28 #include "sd-daemon.h"
31 * Write up to size bytes to buf. Return negative on error, and number of
32 * bytes written otherwise. The last case is a kind of an error too.
34 static ssize_t
write_entry(char *buf
, size_t size
, Uploader
*u
) {
38 assert(size
<= SSIZE_MAX
);
42 switch(u
->entry_state
) {
44 u
->current_cursor
= mfree(u
->current_cursor
);
46 r
= sd_journal_get_cursor(u
->journal
, &u
->current_cursor
);
48 return log_error_errno(r
, "Failed to get cursor: %m");
50 r
= snprintf(buf
+ pos
, size
- pos
,
51 "__CURSOR=%s\n", u
->current_cursor
);
53 /* not enough space */
58 if (pos
+ r
== size
) {
59 /* exactly one character short, but we don't need it */
67 case ENTRY_REALTIME
: {
70 r
= sd_journal_get_realtime_usec(u
->journal
, &realtime
);
72 return log_error_errno(r
, "Failed to get realtime timestamp: %m");
74 r
= snprintf(buf
+ pos
, size
- pos
,
75 "__REALTIME_TIMESTAMP="USEC_FMT
"\n", realtime
);
77 /* not enough space */
82 if (r
+ pos
== size
) {
83 /* exactly one character short, but we don't need it */
91 case ENTRY_MONOTONIC
: {
95 r
= sd_journal_get_monotonic_usec(u
->journal
, &monotonic
, &boot_id
);
97 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
99 r
= snprintf(buf
+ pos
, size
- pos
,
100 "__MONOTONIC_TIMESTAMP="USEC_FMT
"\n", monotonic
);
102 /* not enough space */
107 if (r
+ pos
== size
) {
108 /* exactly one character short, but we don't need it */
109 buf
[size
- 1] = '\n';
116 case ENTRY_BOOT_ID
: {
120 r
= sd_journal_get_monotonic_usec(u
->journal
, NULL
, &boot_id
);
122 return log_error_errno(r
, "Failed to get monotonic timestamp: %m");
124 r
= snprintf(buf
+ pos
, size
- pos
,
125 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id
, sid
));
127 /* not enough space */
132 if (r
+ pos
== size
) {
133 /* exactly one character short, but we don't need it */
134 buf
[size
- 1] = '\n';
141 case ENTRY_NEW_FIELD
: {
144 r
= sd_journal_enumerate_data(u
->journal
,
148 return log_error_errno(r
, "Failed to move to next field in entry: %m");
150 u
->entry_state
= ENTRY_OUTRO
;
154 if (!utf8_is_printable_newline(u
->field_data
,
155 u
->field_length
, false)) {
156 u
->entry_state
= ENTRY_BINARY_FIELD_START
;
163 case ENTRY_TEXT_FIELD
:
164 case ENTRY_BINARY_FIELD
: {
168 done
= size
- pos
> u
->field_length
- u
->field_pos
;
170 tocopy
= u
->field_length
- u
->field_pos
;
175 (char*) u
->field_data
+ u
->field_pos
,
179 buf
[pos
+ tocopy
] = '\n';
181 u
->entry_state
= ENTRY_NEW_FIELD
;
184 u
->field_pos
+= tocopy
;
189 case ENTRY_BINARY_FIELD_START
: {
193 c
= memchr(u
->field_data
, '=', u
->field_length
);
194 if (!c
|| c
== u
->field_data
) {
195 log_error("Invalid field.");
199 len
= c
- (const char*)u
->field_data
;
201 /* need space for label + '\n' */
202 if (size
- pos
< len
+ 1)
205 memcpy(buf
+ pos
, u
->field_data
, len
);
206 buf
[pos
+ len
] = '\n';
209 u
->field_pos
= len
+ 1;
213 case ENTRY_BINARY_FIELD_SIZE
: {
216 /* need space for uint64_t */
220 le64
= htole64(u
->field_length
- u
->field_pos
);
221 memcpy(buf
+ pos
, &le64
, 8);
229 /* need space for '\n' */
240 assert_not_reached("WTF?");
243 assert_not_reached("WTF?");
246 static inline void check_update_watchdog(Uploader
*u
) {
247 usec_t watchdog_usec
;
248 static usec_t before
;
252 if (sd_watchdog_enabled(false, &watchdog_usec
) < 0)
254 if (u
->reset_reference_timestamp
) {
255 before
= now(CLOCK_MONOTONIC
);
256 u
->reset_reference_timestamp
= false;
258 after
= now(CLOCK_MONOTONIC
);
259 elapsed_time
= usec_sub(after
, before
);
260 if (elapsed_time
> watchdog_usec
/ 2) {
261 log_debug("Update watchdog timer");
262 sd_notify(false, "WATCHDOG=1");
263 u
->reset_reference_timestamp
= true;
268 static size_t journal_input_callback(void *buf
, size_t size
, size_t nmemb
, void *userp
) {
276 assert(nmemb
<= SSIZE_MAX
/ size
);
278 check_update_watchdog(u
);
282 while (j
&& filled
< size
* nmemb
) {
283 if (u
->entry_state
== ENTRY_DONE
) {
284 r
= sd_journal_next(j
);
286 log_error_errno(r
, "Failed to move to next entry in journal: %m");
287 return CURL_READFUNC_ABORT
;
290 log_debug("No more entries, waiting for journal.");
292 log_info("No more entries, closing journal.");
293 close_journal_input(u
);
296 u
->uploading
= false;
301 u
->entry_state
= ENTRY_CURSOR
;
304 w
= write_entry((char*)buf
+ filled
, size
* nmemb
- filled
, u
);
306 return CURL_READFUNC_ABORT
;
310 log_error("Buffer space is too small to write entry.");
311 return CURL_READFUNC_ABORT
;
312 } else if (u
->entry_state
!= ENTRY_DONE
)
313 /* This means that all available space was used up */
316 log_debug("Entry %zu (%s) has been uploaded.",
317 u
->entries_sent
, u
->current_cursor
);
323 void close_journal_input(Uploader
*u
) {
327 log_debug("Closing journal input.");
329 sd_journal_close(u
->journal
);
335 static int process_journal_input(Uploader
*u
, int skip
) {
341 r
= sd_journal_next_skip(u
->journal
, skip
);
343 return log_error_errno(r
, "Failed to skip to next entry: %m");
348 u
->entry_state
= ENTRY_CURSOR
;
349 return start_upload(u
, journal_input_callback
, u
);
352 int check_journal_input(Uploader
*u
) {
353 if (u
->input_event
) {
356 r
= sd_journal_process(u
->journal
);
358 log_error_errno(r
, "Failed to process journal: %m");
359 close_journal_input(u
);
363 if (r
== SD_JOURNAL_NOP
)
367 return process_journal_input(u
, 1);
370 static int dispatch_journal_input(sd_event_source
*event
,
381 log_debug("Detected journal input, checking for new data.");
382 return check_journal_input(u
);
385 int open_journal_for_upload(Uploader
*u
,
394 sd_journal_set_data_threshold(j
, 0);
397 fd
= sd_journal_get_fd(j
);
399 return log_error_errno(fd
, "sd_journal_get_fd failed: %m");
401 events
= sd_journal_get_events(j
);
403 r
= sd_journal_reliable_fd(j
);
408 u
->timeout
= JOURNAL_UPLOAD_POLL_TIMEOUT
;
410 r
= sd_event_add_io(u
->events
, &u
->input_event
,
411 fd
, events
, dispatch_journal_input
, u
);
413 return log_error_errno(r
, "Failed to register input event: %m");
415 log_debug("Listening for journal events on fd:%d, timeout %d",
416 fd
, u
->timeout
== (uint64_t) -1 ? -1 : (int) u
->timeout
);
418 log_debug("Not listening for journal events.");
421 r
= sd_journal_seek_cursor(j
, cursor
);
423 return log_error_errno(r
, "Failed to seek to cursor %s: %m",
427 return process_journal_input(u
, 1 + !!after_cursor
);