]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload-journal.c
66af9d5dcbcb433704677df052b1b2c53a2aba0d
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2 /***
3 This file is part of systemd.
4
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
6 ***/
7
8 #include <curl/curl.h>
9 #include <stdbool.h>
10
11 #include "sd-daemon.h"
12
13 #include "alloc-util.h"
14 #include "journal-upload.h"
15 #include "log.h"
16 #include "string-util.h"
17 #include "utf8.h"
18 #include "util.h"
19
20 /**
21 * Write up to size bytes to buf. Return negative on error, and number of
22 * bytes written otherwise. The last case is a kind of an error too.
23 */
24 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
25 int r;
26 size_t pos = 0;
27
28 assert(size <= SSIZE_MAX);
29
30 for (;;) {
31
32 switch(u->entry_state) {
33 case ENTRY_CURSOR: {
34 u->current_cursor = mfree(u->current_cursor);
35
36 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
37 if (r < 0)
38 return log_error_errno(r, "Failed to get cursor: %m");
39
40 r = snprintf(buf + pos, size - pos,
41 "__CURSOR=%s\n", u->current_cursor);
42 if (pos + r > size)
43 /* not enough space */
44 return pos;
45
46 u->entry_state++;
47
48 if (pos + r == size) {
49 /* exactly one character short, but we don't need it */
50 buf[size - 1] = '\n';
51 return size;
52 }
53
54 pos += r;
55 }
56 _fallthrough_;
57 case ENTRY_REALTIME: {
58 usec_t realtime;
59
60 r = sd_journal_get_realtime_usec(u->journal, &realtime);
61 if (r < 0)
62 return log_error_errno(r, "Failed to get realtime timestamp: %m");
63
64 r = snprintf(buf + pos, size - pos,
65 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
66 if (r + pos > size)
67 /* not enough space */
68 return pos;
69
70 u->entry_state++;
71
72 if (r + pos == size) {
73 /* exactly one character short, but we don't need it */
74 buf[size - 1] = '\n';
75 return size;
76 }
77
78 pos += r;
79 }
80 _fallthrough_;
81 case ENTRY_MONOTONIC: {
82 usec_t monotonic;
83 sd_id128_t boot_id;
84
85 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
86 if (r < 0)
87 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
88
89 r = snprintf(buf + pos, size - pos,
90 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
91 if (r + pos > size)
92 /* not enough space */
93 return pos;
94
95 u->entry_state++;
96
97 if (r + pos == size) {
98 /* exactly one character short, but we don't need it */
99 buf[size - 1] = '\n';
100 return size;
101 }
102
103 pos += r;
104 }
105 _fallthrough_;
106 case ENTRY_BOOT_ID: {
107 sd_id128_t boot_id;
108 char sid[33];
109
110 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
111 if (r < 0)
112 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
113
114 r = snprintf(buf + pos, size - pos,
115 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
116 if (r + pos > size)
117 /* not enough space */
118 return pos;
119
120 u->entry_state++;
121
122 if (r + pos == size) {
123 /* exactly one character short, but we don't need it */
124 buf[size - 1] = '\n';
125 return size;
126 }
127
128 pos += r;
129 }
130 _fallthrough_;
131 case ENTRY_NEW_FIELD: {
132 u->field_pos = 0;
133
134 r = sd_journal_enumerate_data(u->journal,
135 &u->field_data,
136 &u->field_length);
137 if (r < 0)
138 return log_error_errno(r, "Failed to move to next field in entry: %m");
139 else if (r == 0) {
140 u->entry_state = ENTRY_OUTRO;
141 continue;
142 }
143
144 /* We already printed the boot id from the data in
145 * the header, hence let's suppress it here */
146 if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
147 continue;
148
149 if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
150 u->entry_state = ENTRY_BINARY_FIELD_START;
151 continue;
152 }
153
154 u->entry_state++;
155 }
156 _fallthrough_;
157 case ENTRY_TEXT_FIELD:
158 case ENTRY_BINARY_FIELD: {
159 bool done;
160 size_t tocopy;
161
162 done = size - pos > u->field_length - u->field_pos;
163 if (done)
164 tocopy = u->field_length - u->field_pos;
165 else
166 tocopy = size - pos;
167
168 memcpy(buf + pos,
169 (char*) u->field_data + u->field_pos,
170 tocopy);
171
172 if (done) {
173 buf[pos + tocopy] = '\n';
174 pos += tocopy + 1;
175 u->entry_state = ENTRY_NEW_FIELD;
176 continue;
177 } else {
178 u->field_pos += tocopy;
179 return size;
180 }
181 }
182
183 case ENTRY_BINARY_FIELD_START: {
184 const char *c;
185 size_t len;
186
187 c = memchr(u->field_data, '=', u->field_length);
188 if (!c || c == u->field_data) {
189 log_error("Invalid field.");
190 return -EINVAL;
191 }
192
193 len = c - (const char*)u->field_data;
194
195 /* need space for label + '\n' */
196 if (size - pos < len + 1)
197 return pos;
198
199 memcpy(buf + pos, u->field_data, len);
200 buf[pos + len] = '\n';
201 pos += len + 1;
202
203 u->field_pos = len + 1;
204 u->entry_state++;
205 }
206 _fallthrough_;
207 case ENTRY_BINARY_FIELD_SIZE: {
208 uint64_t le64;
209
210 /* need space for uint64_t */
211 if (size - pos < 8)
212 return pos;
213
214 le64 = htole64(u->field_length - u->field_pos);
215 memcpy(buf + pos, &le64, 8);
216 pos += 8;
217
218 u->entry_state++;
219 continue;
220 }
221
222 case ENTRY_OUTRO:
223 /* need space for '\n' */
224 if (size - pos < 1)
225 return pos;
226
227 buf[pos++] = '\n';
228 u->entry_state++;
229 u->entries_sent++;
230
231 return pos;
232
233 default:
234 assert_not_reached("WTF?");
235 }
236 }
237 assert_not_reached("WTF?");
238 }
239
240 static inline void check_update_watchdog(Uploader *u) {
241 usec_t after;
242 usec_t elapsed_time;
243
244 if (u->watchdog_usec <= 0)
245 return;
246
247 after = now(CLOCK_MONOTONIC);
248 elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
249 if (elapsed_time > u->watchdog_usec / 2) {
250 log_debug("Update watchdog timer");
251 sd_notify(false, "WATCHDOG=1");
252 u->watchdog_timestamp = after;
253 }
254 }
255
256 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
257 Uploader *u = userp;
258 int r;
259 sd_journal *j;
260 size_t filled = 0;
261 ssize_t w;
262
263 assert(u);
264 assert(nmemb <= SSIZE_MAX / size);
265
266 check_update_watchdog(u);
267
268 j = u->journal;
269
270 while (j && filled < size * nmemb) {
271 if (u->entry_state == ENTRY_DONE) {
272 r = sd_journal_next(j);
273 if (r < 0) {
274 log_error_errno(r, "Failed to move to next entry in journal: %m");
275 return CURL_READFUNC_ABORT;
276 } else if (r == 0) {
277 if (u->input_event)
278 log_debug("No more entries, waiting for journal.");
279 else {
280 log_info("No more entries, closing journal.");
281 close_journal_input(u);
282 }
283
284 u->uploading = false;
285
286 break;
287 }
288
289 u->entry_state = ENTRY_CURSOR;
290 }
291
292 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
293 if (w < 0)
294 return CURL_READFUNC_ABORT;
295 filled += w;
296
297 if (filled == 0) {
298 log_error("Buffer space is too small to write entry.");
299 return CURL_READFUNC_ABORT;
300 } else if (u->entry_state != ENTRY_DONE)
301 /* This means that all available space was used up */
302 break;
303
304 log_debug("Entry %zu (%s) has been uploaded.",
305 u->entries_sent, u->current_cursor);
306 }
307
308 return filled;
309 }
310
311 void close_journal_input(Uploader *u) {
312 assert(u);
313
314 if (u->journal) {
315 log_debug("Closing journal input.");
316
317 sd_journal_close(u->journal);
318 u->journal = NULL;
319 }
320 u->timeout = 0;
321 }
322
323 static int process_journal_input(Uploader *u, int skip) {
324 int r;
325
326 if (u->uploading)
327 return 0;
328
329 r = sd_journal_next_skip(u->journal, skip);
330 if (r < 0)
331 return log_error_errno(r, "Failed to skip to next entry: %m");
332 else if (r < skip)
333 return 0;
334
335 /* have data */
336 u->entry_state = ENTRY_CURSOR;
337 return start_upload(u, journal_input_callback, u);
338 }
339
340 int check_journal_input(Uploader *u) {
341 if (u->input_event) {
342 int r;
343
344 r = sd_journal_process(u->journal);
345 if (r < 0) {
346 log_error_errno(r, "Failed to process journal: %m");
347 close_journal_input(u);
348 return r;
349 }
350
351 if (r == SD_JOURNAL_NOP)
352 return 0;
353 }
354
355 return process_journal_input(u, 1);
356 }
357
358 static int dispatch_journal_input(sd_event_source *event,
359 int fd,
360 uint32_t revents,
361 void *userp) {
362 Uploader *u = userp;
363
364 assert(u);
365
366 if (u->uploading)
367 return 0;
368
369 log_debug("Detected journal input, checking for new data.");
370 return check_journal_input(u);
371 }
372
373 int open_journal_for_upload(Uploader *u,
374 sd_journal *j,
375 const char *cursor,
376 bool after_cursor,
377 bool follow) {
378 int fd, r, events;
379
380 u->journal = j;
381
382 sd_journal_set_data_threshold(j, 0);
383
384 if (follow) {
385 fd = sd_journal_get_fd(j);
386 if (fd < 0)
387 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
388
389 events = sd_journal_get_events(j);
390
391 r = sd_journal_reliable_fd(j);
392 assert(r >= 0);
393 if (r > 0)
394 u->timeout = -1;
395 else
396 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
397
398 r = sd_event_add_io(u->events, &u->input_event,
399 fd, events, dispatch_journal_input, u);
400 if (r < 0)
401 return log_error_errno(r, "Failed to register input event: %m");
402
403 log_debug("Listening for journal events on fd:%d, timeout %d",
404 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
405 } else
406 log_debug("Not listening for journal events.");
407
408 if (cursor) {
409 r = sd_journal_seek_cursor(j, cursor);
410 if (r < 0)
411 return log_error_errno(r, "Failed to seek to cursor %s: %m",
412 cursor);
413 }
414
415 return process_journal_input(u, 1 + !!after_cursor);
416 }