]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload-journal.c
tree-wide: beautify remaining copyright statements
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2 /***
3 Copyright © 2014 Zbigniew Jędrzejewski-Szmek
4 ***/
5
6 #include <curl/curl.h>
7 #include <stdbool.h>
8
9 #include "sd-daemon.h"
10
11 #include "alloc-util.h"
12 #include "journal-upload.h"
13 #include "log.h"
14 #include "string-util.h"
15 #include "utf8.h"
16 #include "util.h"
17
18 /**
19 * Write up to size bytes to buf. Return negative on error, and number of
20 * bytes written otherwise. The last case is a kind of an error too.
21 */
22 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
23 int r;
24 size_t pos = 0;
25
26 assert(size <= SSIZE_MAX);
27
28 for (;;) {
29
30 switch(u->entry_state) {
31 case ENTRY_CURSOR: {
32 u->current_cursor = mfree(u->current_cursor);
33
34 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
35 if (r < 0)
36 return log_error_errno(r, "Failed to get cursor: %m");
37
38 r = snprintf(buf + pos, size - pos,
39 "__CURSOR=%s\n", u->current_cursor);
40 if (pos + r > size)
41 /* not enough space */
42 return pos;
43
44 u->entry_state++;
45
46 if (pos + r == size) {
47 /* exactly one character short, but we don't need it */
48 buf[size - 1] = '\n';
49 return size;
50 }
51
52 pos += r;
53 }
54 _fallthrough_;
55 case ENTRY_REALTIME: {
56 usec_t realtime;
57
58 r = sd_journal_get_realtime_usec(u->journal, &realtime);
59 if (r < 0)
60 return log_error_errno(r, "Failed to get realtime timestamp: %m");
61
62 r = snprintf(buf + pos, size - pos,
63 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
64 if (r + pos > size)
65 /* not enough space */
66 return pos;
67
68 u->entry_state++;
69
70 if (r + pos == size) {
71 /* exactly one character short, but we don't need it */
72 buf[size - 1] = '\n';
73 return size;
74 }
75
76 pos += r;
77 }
78 _fallthrough_;
79 case ENTRY_MONOTONIC: {
80 usec_t monotonic;
81 sd_id128_t boot_id;
82
83 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
84 if (r < 0)
85 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
86
87 r = snprintf(buf + pos, size - pos,
88 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
89 if (r + pos > size)
90 /* not enough space */
91 return pos;
92
93 u->entry_state++;
94
95 if (r + pos == size) {
96 /* exactly one character short, but we don't need it */
97 buf[size - 1] = '\n';
98 return size;
99 }
100
101 pos += r;
102 }
103 _fallthrough_;
104 case ENTRY_BOOT_ID: {
105 sd_id128_t boot_id;
106 char sid[33];
107
108 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
109 if (r < 0)
110 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
111
112 r = snprintf(buf + pos, size - pos,
113 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
114 if (r + pos > size)
115 /* not enough space */
116 return pos;
117
118 u->entry_state++;
119
120 if (r + pos == size) {
121 /* exactly one character short, but we don't need it */
122 buf[size - 1] = '\n';
123 return size;
124 }
125
126 pos += r;
127 }
128 _fallthrough_;
129 case ENTRY_NEW_FIELD: {
130 u->field_pos = 0;
131
132 r = sd_journal_enumerate_data(u->journal,
133 &u->field_data,
134 &u->field_length);
135 if (r < 0)
136 return log_error_errno(r, "Failed to move to next field in entry: %m");
137 else if (r == 0) {
138 u->entry_state = ENTRY_OUTRO;
139 continue;
140 }
141
142 /* We already printed the boot id from the data in
143 * the header, hence let's suppress it here */
144 if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
145 continue;
146
147 if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
148 u->entry_state = ENTRY_BINARY_FIELD_START;
149 continue;
150 }
151
152 u->entry_state++;
153 }
154 _fallthrough_;
155 case ENTRY_TEXT_FIELD:
156 case ENTRY_BINARY_FIELD: {
157 bool done;
158 size_t tocopy;
159
160 done = size - pos > u->field_length - u->field_pos;
161 if (done)
162 tocopy = u->field_length - u->field_pos;
163 else
164 tocopy = size - pos;
165
166 memcpy(buf + pos,
167 (char*) u->field_data + u->field_pos,
168 tocopy);
169
170 if (done) {
171 buf[pos + tocopy] = '\n';
172 pos += tocopy + 1;
173 u->entry_state = ENTRY_NEW_FIELD;
174 continue;
175 } else {
176 u->field_pos += tocopy;
177 return size;
178 }
179 }
180
181 case ENTRY_BINARY_FIELD_START: {
182 const char *c;
183 size_t len;
184
185 c = memchr(u->field_data, '=', u->field_length);
186 if (!c || c == u->field_data) {
187 log_error("Invalid field.");
188 return -EINVAL;
189 }
190
191 len = c - (const char*)u->field_data;
192
193 /* need space for label + '\n' */
194 if (size - pos < len + 1)
195 return pos;
196
197 memcpy(buf + pos, u->field_data, len);
198 buf[pos + len] = '\n';
199 pos += len + 1;
200
201 u->field_pos = len + 1;
202 u->entry_state++;
203 }
204 _fallthrough_;
205 case ENTRY_BINARY_FIELD_SIZE: {
206 uint64_t le64;
207
208 /* need space for uint64_t */
209 if (size - pos < 8)
210 return pos;
211
212 le64 = htole64(u->field_length - u->field_pos);
213 memcpy(buf + pos, &le64, 8);
214 pos += 8;
215
216 u->entry_state++;
217 continue;
218 }
219
220 case ENTRY_OUTRO:
221 /* need space for '\n' */
222 if (size - pos < 1)
223 return pos;
224
225 buf[pos++] = '\n';
226 u->entry_state++;
227 u->entries_sent++;
228
229 return pos;
230
231 default:
232 assert_not_reached("WTF?");
233 }
234 }
235 assert_not_reached("WTF?");
236 }
237
238 static inline void check_update_watchdog(Uploader *u) {
239 usec_t after;
240 usec_t elapsed_time;
241
242 if (u->watchdog_usec <= 0)
243 return;
244
245 after = now(CLOCK_MONOTONIC);
246 elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
247 if (elapsed_time > u->watchdog_usec / 2) {
248 log_debug("Update watchdog timer");
249 sd_notify(false, "WATCHDOG=1");
250 u->watchdog_timestamp = after;
251 }
252 }
253
254 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
255 Uploader *u = userp;
256 int r;
257 sd_journal *j;
258 size_t filled = 0;
259 ssize_t w;
260
261 assert(u);
262 assert(nmemb <= SSIZE_MAX / size);
263
264 check_update_watchdog(u);
265
266 j = u->journal;
267
268 while (j && filled < size * nmemb) {
269 if (u->entry_state == ENTRY_DONE) {
270 r = sd_journal_next(j);
271 if (r < 0) {
272 log_error_errno(r, "Failed to move to next entry in journal: %m");
273 return CURL_READFUNC_ABORT;
274 } else if (r == 0) {
275 if (u->input_event)
276 log_debug("No more entries, waiting for journal.");
277 else {
278 log_info("No more entries, closing journal.");
279 close_journal_input(u);
280 }
281
282 u->uploading = false;
283
284 break;
285 }
286
287 u->entry_state = ENTRY_CURSOR;
288 }
289
290 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
291 if (w < 0)
292 return CURL_READFUNC_ABORT;
293 filled += w;
294
295 if (filled == 0) {
296 log_error("Buffer space is too small to write entry.");
297 return CURL_READFUNC_ABORT;
298 } else if (u->entry_state != ENTRY_DONE)
299 /* This means that all available space was used up */
300 break;
301
302 log_debug("Entry %zu (%s) has been uploaded.",
303 u->entries_sent, u->current_cursor);
304 }
305
306 return filled;
307 }
308
309 void close_journal_input(Uploader *u) {
310 assert(u);
311
312 if (u->journal) {
313 log_debug("Closing journal input.");
314
315 sd_journal_close(u->journal);
316 u->journal = NULL;
317 }
318 u->timeout = 0;
319 }
320
321 static int process_journal_input(Uploader *u, int skip) {
322 int r;
323
324 if (u->uploading)
325 return 0;
326
327 r = sd_journal_next_skip(u->journal, skip);
328 if (r < 0)
329 return log_error_errno(r, "Failed to skip to next entry: %m");
330 else if (r < skip)
331 return 0;
332
333 /* have data */
334 u->entry_state = ENTRY_CURSOR;
335 return start_upload(u, journal_input_callback, u);
336 }
337
338 int check_journal_input(Uploader *u) {
339 if (u->input_event) {
340 int r;
341
342 r = sd_journal_process(u->journal);
343 if (r < 0) {
344 log_error_errno(r, "Failed to process journal: %m");
345 close_journal_input(u);
346 return r;
347 }
348
349 if (r == SD_JOURNAL_NOP)
350 return 0;
351 }
352
353 return process_journal_input(u, 1);
354 }
355
356 static int dispatch_journal_input(sd_event_source *event,
357 int fd,
358 uint32_t revents,
359 void *userp) {
360 Uploader *u = userp;
361
362 assert(u);
363
364 if (u->uploading)
365 return 0;
366
367 log_debug("Detected journal input, checking for new data.");
368 return check_journal_input(u);
369 }
370
371 int open_journal_for_upload(Uploader *u,
372 sd_journal *j,
373 const char *cursor,
374 bool after_cursor,
375 bool follow) {
376 int fd, r, events;
377
378 u->journal = j;
379
380 sd_journal_set_data_threshold(j, 0);
381
382 if (follow) {
383 fd = sd_journal_get_fd(j);
384 if (fd < 0)
385 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
386
387 events = sd_journal_get_events(j);
388
389 r = sd_journal_reliable_fd(j);
390 assert(r >= 0);
391 if (r > 0)
392 u->timeout = -1;
393 else
394 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
395
396 r = sd_event_add_io(u->events, &u->input_event,
397 fd, events, dispatch_journal_input, u);
398 if (r < 0)
399 return log_error_errno(r, "Failed to register input event: %m");
400
401 log_debug("Listening for journal events on fd:%d, timeout %d",
402 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
403 } else
404 log_debug("Not listening for journal events.");
405
406 if (cursor) {
407 r = sd_journal_seek_cursor(j, cursor);
408 if (r < 0)
409 return log_error_errno(r, "Failed to seek to cursor %s: %m",
410 cursor);
411 }
412
413 return process_journal_input(u, 1 + !!after_cursor);
414 }