]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-upload-journal.c
license: LGPL-2.1+ -> LGPL-2.1-or-later
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
CommitLineData
db9ecf05 1/* SPDX-License-Identifier: LGPL-2.1-or-later */
b5efdb8a 2
eacbb4d3 3#include <curl/curl.h>
cf0fbc49 4#include <stdbool.h>
eacbb4d3 5
0ab896b3
ZJS
6#include "sd-daemon.h"
7
b5efdb8a
LP
8#include "alloc-util.h"
9#include "journal-upload.h"
eacbb4d3 10#include "log.h"
0ab896b3 11#include "string-util.h"
eacbb4d3 12#include "utf8.h"
b5efdb8a 13#include "util.h"
eacbb4d3
ZJS
14
15/**
16 * Write up to size bytes to buf. Return negative on error, and number of
17 * bytes written otherwise. The last case is a kind of an error too.
18 */
19static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
20 int r;
21 size_t pos = 0;
22
23 assert(size <= SSIZE_MAX);
24
57255510 25 for (;;) {
eacbb4d3
ZJS
26
27 switch(u->entry_state) {
28 case ENTRY_CURSOR: {
a1e58e8e 29 u->current_cursor = mfree(u->current_cursor);
eacbb4d3 30
722b6795 31 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
eb56eb9b
MS
32 if (r < 0)
33 return log_error_errno(r, "Failed to get cursor: %m");
eacbb4d3
ZJS
34
35 r = snprintf(buf + pos, size - pos,
722b6795 36 "__CURSOR=%s\n", u->current_cursor);
91db8ed5
ZJS
37 assert(r >= 0);
38 if ((size_t) r > size - pos)
eacbb4d3
ZJS
39 /* not enough space */
40 return pos;
41
313cefa1 42 u->entry_state++;
eacbb4d3
ZJS
43
44 if (pos + r == size) {
45 /* exactly one character short, but we don't need it */
46 buf[size - 1] = '\n';
47 return size;
48 }
49
50 pos += r;
4831981d
SL
51 }
52 _fallthrough_;
eacbb4d3
ZJS
53 case ENTRY_REALTIME: {
54 usec_t realtime;
55
56 r = sd_journal_get_realtime_usec(u->journal, &realtime);
eb56eb9b
MS
57 if (r < 0)
58 return log_error_errno(r, "Failed to get realtime timestamp: %m");
eacbb4d3
ZJS
59
60 r = snprintf(buf + pos, size - pos,
61 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
91db8ed5
ZJS
62 assert(r >= 0);
63 if ((size_t) r > size - pos)
eacbb4d3
ZJS
64 /* not enough space */
65 return pos;
66
313cefa1 67 u->entry_state++;
eacbb4d3
ZJS
68
69 if (r + pos == size) {
70 /* exactly one character short, but we don't need it */
71 buf[size - 1] = '\n';
72 return size;
73 }
74
75 pos += r;
4831981d
SL
76 }
77 _fallthrough_;
eacbb4d3
ZJS
78 case ENTRY_MONOTONIC: {
79 usec_t monotonic;
80 sd_id128_t boot_id;
81
82 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
eb56eb9b
MS
83 if (r < 0)
84 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
eacbb4d3
ZJS
85
86 r = snprintf(buf + pos, size - pos,
87 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
91db8ed5
ZJS
88 assert(r >= 0);
89 if ((size_t) r > size - pos)
eacbb4d3
ZJS
90 /* not enough space */
91 return pos;
92
313cefa1 93 u->entry_state++;
eacbb4d3
ZJS
94
95 if (r + pos == size) {
96 /* exactly one character short, but we don't need it */
97 buf[size - 1] = '\n';
98 return size;
99 }
100
101 pos += r;
4831981d
SL
102 }
103 _fallthrough_;
eacbb4d3
ZJS
104 case ENTRY_BOOT_ID: {
105 sd_id128_t boot_id;
5905d7cf 106 char sid[SD_ID128_STRING_MAX];
eacbb4d3
ZJS
107
108 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
eb56eb9b
MS
109 if (r < 0)
110 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
eacbb4d3
ZJS
111
112 r = snprintf(buf + pos, size - pos,
113 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
91db8ed5
ZJS
114 assert(r >= 0);
115 if ((size_t) r > size - pos)
eacbb4d3
ZJS
116 /* not enough space */
117 return pos;
118
313cefa1 119 u->entry_state++;
eacbb4d3
ZJS
120
121 if (r + pos == size) {
122 /* exactly one character short, but we don't need it */
123 buf[size - 1] = '\n';
124 return size;
125 }
126
127 pos += r;
4831981d
SL
128 }
129 _fallthrough_;
eacbb4d3
ZJS
130 case ENTRY_NEW_FIELD: {
131 u->field_pos = 0;
132
133 r = sd_journal_enumerate_data(u->journal,
134 &u->field_data,
135 &u->field_length);
eb56eb9b
MS
136 if (r < 0)
137 return log_error_errno(r, "Failed to move to next field in entry: %m");
138 else if (r == 0) {
eacbb4d3
ZJS
139 u->entry_state = ENTRY_OUTRO;
140 continue;
141 }
142
0ab896b3
ZJS
143 /* We already printed the boot id from the data in
144 * the header, hence let's suppress it here */
145 if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
146 continue;
147
148 if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
eacbb4d3
ZJS
149 u->entry_state = ENTRY_BINARY_FIELD_START;
150 continue;
151 }
152
313cefa1 153 u->entry_state++;
4831981d
SL
154 }
155 _fallthrough_;
eacbb4d3
ZJS
156 case ENTRY_TEXT_FIELD:
157 case ENTRY_BINARY_FIELD: {
158 bool done;
159 size_t tocopy;
160
161 done = size - pos > u->field_length - u->field_pos;
162 if (done)
163 tocopy = u->field_length - u->field_pos;
164 else
165 tocopy = size - pos;
166
167 memcpy(buf + pos,
168 (char*) u->field_data + u->field_pos,
169 tocopy);
170
171 if (done) {
172 buf[pos + tocopy] = '\n';
173 pos += tocopy + 1;
174 u->entry_state = ENTRY_NEW_FIELD;
175 continue;
176 } else {
177 u->field_pos += tocopy;
178 return size;
179 }
180 }
181
182 case ENTRY_BINARY_FIELD_START: {
183 const char *c;
184 size_t len;
185
186 c = memchr(u->field_data, '=', u->field_length);
baaa35ad
ZJS
187 if (!c || c == u->field_data)
188 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
189 "Invalid field.");
eacbb4d3
ZJS
190
191 len = c - (const char*)u->field_data;
192
193 /* need space for label + '\n' */
194 if (size - pos < len + 1)
195 return pos;
196
197 memcpy(buf + pos, u->field_data, len);
198 buf[pos + len] = '\n';
199 pos += len + 1;
200
201 u->field_pos = len + 1;
313cefa1 202 u->entry_state++;
4831981d
SL
203 }
204 _fallthrough_;
eacbb4d3
ZJS
205 case ENTRY_BINARY_FIELD_SIZE: {
206 uint64_t le64;
207
208 /* need space for uint64_t */
209 if (size - pos < 8)
210 return pos;
211
212 le64 = htole64(u->field_length - u->field_pos);
213 memcpy(buf + pos, &le64, 8);
214 pos += 8;
215
313cefa1 216 u->entry_state++;
eacbb4d3
ZJS
217 continue;
218 }
219
220 case ENTRY_OUTRO:
221 /* need space for '\n' */
222 if (size - pos < 1)
223 return pos;
224
225 buf[pos++] = '\n';
313cefa1
VC
226 u->entry_state++;
227 u->entries_sent++;
eacbb4d3
ZJS
228
229 return pos;
230
231 default:
232 assert_not_reached("WTF?");
233 }
234 }
235 assert_not_reached("WTF?");
236}
237
a1e92eee 238static void check_update_watchdog(Uploader *u) {
d79ca7a6
KC
239 usec_t after;
240 usec_t elapsed_time;
241
0aa176a7 242 if (u->watchdog_usec <= 0)
d79ca7a6 243 return;
0aa176a7
ZJS
244
245 after = now(CLOCK_MONOTONIC);
54d8ef14 246 elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
0aa176a7
ZJS
247 if (elapsed_time > u->watchdog_usec / 2) {
248 log_debug("Update watchdog timer");
249 sd_notify(false, "WATCHDOG=1");
250 u->watchdog_timestamp = after;
d79ca7a6
KC
251 }
252}
253
eacbb4d3
ZJS
254static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
255 Uploader *u = userp;
256 int r;
257 sd_journal *j;
258 size_t filled = 0;
259 ssize_t w;
260
261 assert(u);
262 assert(nmemb <= SSIZE_MAX / size);
263
d79ca7a6
KC
264 check_update_watchdog(u);
265
eacbb4d3
ZJS
266 j = u->journal;
267
268 while (j && filled < size * nmemb) {
269 if (u->entry_state == ENTRY_DONE) {
270 r = sd_journal_next(j);
271 if (r < 0) {
c33b3297 272 log_error_errno(r, "Failed to move to next entry in journal: %m");
eacbb4d3
ZJS
273 return CURL_READFUNC_ABORT;
274 } else if (r == 0) {
275 if (u->input_event)
276 log_debug("No more entries, waiting for journal.");
277 else {
278 log_info("No more entries, closing journal.");
279 close_journal_input(u);
280 }
281
282 u->uploading = false;
283
284 break;
285 }
286
287 u->entry_state = ENTRY_CURSOR;
288 }
289
290 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
291 if (w < 0)
292 return CURL_READFUNC_ABORT;
293 filled += w;
294
295 if (filled == 0) {
296 log_error("Buffer space is too small to write entry.");
297 return CURL_READFUNC_ABORT;
298 } else if (u->entry_state != ENTRY_DONE)
299 /* This means that all available space was used up */
300 break;
301
302 log_debug("Entry %zu (%s) has been uploaded.",
722b6795 303 u->entries_sent, u->current_cursor);
eacbb4d3
ZJS
304 }
305
306 return filled;
307}
308
309void close_journal_input(Uploader *u) {
310 assert(u);
311
312 if (u->journal) {
313 log_debug("Closing journal input.");
314
315 sd_journal_close(u->journal);
316 u->journal = NULL;
317 }
318 u->timeout = 0;
319}
320
321static int process_journal_input(Uploader *u, int skip) {
322 int r;
323
8a3db16d
KC
324 if (u->uploading)
325 return 0;
326
eacbb4d3 327 r = sd_journal_next_skip(u->journal, skip);
eb56eb9b
MS
328 if (r < 0)
329 return log_error_errno(r, "Failed to skip to next entry: %m");
330 else if (r < skip)
eacbb4d3
ZJS
331 return 0;
332
333 /* have data */
334 u->entry_state = ENTRY_CURSOR;
335 return start_upload(u, journal_input_callback, u);
336}
337
338int check_journal_input(Uploader *u) {
339 if (u->input_event) {
340 int r;
341
342 r = sd_journal_process(u->journal);
343 if (r < 0) {
da927ba9 344 log_error_errno(r, "Failed to process journal: %m");
eacbb4d3
ZJS
345 close_journal_input(u);
346 return r;
347 }
348
349 if (r == SD_JOURNAL_NOP)
350 return 0;
351 }
352
353 return process_journal_input(u, 1);
354}
355
356static int dispatch_journal_input(sd_event_source *event,
357 int fd,
358 uint32_t revents,
359 void *userp) {
360 Uploader *u = userp;
361
362 assert(u);
363
8a3db16d 364 if (u->uploading)
eacbb4d3 365 return 0;
eacbb4d3
ZJS
366
367 log_debug("Detected journal input, checking for new data.");
368 return check_journal_input(u);
369}
370
371int open_journal_for_upload(Uploader *u,
372 sd_journal *j,
373 const char *cursor,
374 bool after_cursor,
375 bool follow) {
376 int fd, r, events;
377
378 u->journal = j;
379
380 sd_journal_set_data_threshold(j, 0);
381
382 if (follow) {
383 fd = sd_journal_get_fd(j);
eb56eb9b
MS
384 if (fd < 0)
385 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
eacbb4d3
ZJS
386
387 events = sd_journal_get_events(j);
388
389 r = sd_journal_reliable_fd(j);
390 assert(r >= 0);
391 if (r > 0)
392 u->timeout = -1;
393 else
394 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
395
396 r = sd_event_add_io(u->events, &u->input_event,
397 fd, events, dispatch_journal_input, u);
eb56eb9b
MS
398 if (r < 0)
399 return log_error_errno(r, "Failed to register input event: %m");
eacbb4d3
ZJS
400
401 log_debug("Listening for journal events on fd:%d, timeout %d",
402 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
403 } else
404 log_debug("Not listening for journal events.");
405
406 if (cursor) {
407 r = sd_journal_seek_cursor(j, cursor);
ece174c5 408 if (r < 0)
eb56eb9b
MS
409 return log_error_errno(r, "Failed to seek to cursor %s: %m",
410 cursor);
eacbb4d3
ZJS
411 }
412
bc48b25a 413 return process_journal_input(u, !!after_cursor);
eacbb4d3 414}