]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload-journal.c
pkgconfig: define variables relative to ${prefix}/${rootprefix}/${sysconfdir}
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2
3 #include <curl/curl.h>
4 #include <stdbool.h>
5
6 #include "sd-daemon.h"
7
8 #include "alloc-util.h"
9 #include "journal-upload.h"
10 #include "log.h"
11 #include "string-util.h"
12 #include "utf8.h"
13 #include "util.h"
14
15 /**
16 * Write up to size bytes to buf. Return negative on error, and number of
17 * bytes written otherwise. The last case is a kind of an error too.
18 */
19 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
20 int r;
21 size_t pos = 0;
22
23 assert(size <= SSIZE_MAX);
24
25 for (;;) {
26
27 switch(u->entry_state) {
28 case ENTRY_CURSOR: {
29 u->current_cursor = mfree(u->current_cursor);
30
31 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
32 if (r < 0)
33 return log_error_errno(r, "Failed to get cursor: %m");
34
35 r = snprintf(buf + pos, size - pos,
36 "__CURSOR=%s\n", u->current_cursor);
37 assert(r >= 0);
38 if ((size_t) r > size - pos)
39 /* not enough space */
40 return pos;
41
42 u->entry_state++;
43
44 if (pos + r == size) {
45 /* exactly one character short, but we don't need it */
46 buf[size - 1] = '\n';
47 return size;
48 }
49
50 pos += r;
51 }
52 _fallthrough_;
53 case ENTRY_REALTIME: {
54 usec_t realtime;
55
56 r = sd_journal_get_realtime_usec(u->journal, &realtime);
57 if (r < 0)
58 return log_error_errno(r, "Failed to get realtime timestamp: %m");
59
60 r = snprintf(buf + pos, size - pos,
61 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
62 assert(r >= 0);
63 if ((size_t) r > size - pos)
64 /* not enough space */
65 return pos;
66
67 u->entry_state++;
68
69 if (r + pos == size) {
70 /* exactly one character short, but we don't need it */
71 buf[size - 1] = '\n';
72 return size;
73 }
74
75 pos += r;
76 }
77 _fallthrough_;
78 case ENTRY_MONOTONIC: {
79 usec_t monotonic;
80 sd_id128_t boot_id;
81
82 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
83 if (r < 0)
84 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
85
86 r = snprintf(buf + pos, size - pos,
87 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
88 assert(r >= 0);
89 if ((size_t) r > size - pos)
90 /* not enough space */
91 return pos;
92
93 u->entry_state++;
94
95 if (r + pos == size) {
96 /* exactly one character short, but we don't need it */
97 buf[size - 1] = '\n';
98 return size;
99 }
100
101 pos += r;
102 }
103 _fallthrough_;
104 case ENTRY_BOOT_ID: {
105 sd_id128_t boot_id;
106 char sid[33];
107
108 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
109 if (r < 0)
110 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
111
112 r = snprintf(buf + pos, size - pos,
113 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
114 assert(r >= 0);
115 if ((size_t) r > size - pos)
116 /* not enough space */
117 return pos;
118
119 u->entry_state++;
120
121 if (r + pos == size) {
122 /* exactly one character short, but we don't need it */
123 buf[size - 1] = '\n';
124 return size;
125 }
126
127 pos += r;
128 }
129 _fallthrough_;
130 case ENTRY_NEW_FIELD: {
131 u->field_pos = 0;
132
133 r = sd_journal_enumerate_data(u->journal,
134 &u->field_data,
135 &u->field_length);
136 if (r < 0)
137 return log_error_errno(r, "Failed to move to next field in entry: %m");
138 else if (r == 0) {
139 u->entry_state = ENTRY_OUTRO;
140 continue;
141 }
142
143 /* We already printed the boot id from the data in
144 * the header, hence let's suppress it here */
145 if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID="))
146 continue;
147
148 if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) {
149 u->entry_state = ENTRY_BINARY_FIELD_START;
150 continue;
151 }
152
153 u->entry_state++;
154 }
155 _fallthrough_;
156 case ENTRY_TEXT_FIELD:
157 case ENTRY_BINARY_FIELD: {
158 bool done;
159 size_t tocopy;
160
161 done = size - pos > u->field_length - u->field_pos;
162 if (done)
163 tocopy = u->field_length - u->field_pos;
164 else
165 tocopy = size - pos;
166
167 memcpy(buf + pos,
168 (char*) u->field_data + u->field_pos,
169 tocopy);
170
171 if (done) {
172 buf[pos + tocopy] = '\n';
173 pos += tocopy + 1;
174 u->entry_state = ENTRY_NEW_FIELD;
175 continue;
176 } else {
177 u->field_pos += tocopy;
178 return size;
179 }
180 }
181
182 case ENTRY_BINARY_FIELD_START: {
183 const char *c;
184 size_t len;
185
186 c = memchr(u->field_data, '=', u->field_length);
187 if (!c || c == u->field_data) {
188 log_error("Invalid field.");
189 return -EINVAL;
190 }
191
192 len = c - (const char*)u->field_data;
193
194 /* need space for label + '\n' */
195 if (size - pos < len + 1)
196 return pos;
197
198 memcpy(buf + pos, u->field_data, len);
199 buf[pos + len] = '\n';
200 pos += len + 1;
201
202 u->field_pos = len + 1;
203 u->entry_state++;
204 }
205 _fallthrough_;
206 case ENTRY_BINARY_FIELD_SIZE: {
207 uint64_t le64;
208
209 /* need space for uint64_t */
210 if (size - pos < 8)
211 return pos;
212
213 le64 = htole64(u->field_length - u->field_pos);
214 memcpy(buf + pos, &le64, 8);
215 pos += 8;
216
217 u->entry_state++;
218 continue;
219 }
220
221 case ENTRY_OUTRO:
222 /* need space for '\n' */
223 if (size - pos < 1)
224 return pos;
225
226 buf[pos++] = '\n';
227 u->entry_state++;
228 u->entries_sent++;
229
230 return pos;
231
232 default:
233 assert_not_reached("WTF?");
234 }
235 }
236 assert_not_reached("WTF?");
237 }
238
239 static inline void check_update_watchdog(Uploader *u) {
240 usec_t after;
241 usec_t elapsed_time;
242
243 if (u->watchdog_usec <= 0)
244 return;
245
246 after = now(CLOCK_MONOTONIC);
247 elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
248 if (elapsed_time > u->watchdog_usec / 2) {
249 log_debug("Update watchdog timer");
250 sd_notify(false, "WATCHDOG=1");
251 u->watchdog_timestamp = after;
252 }
253 }
254
255 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
256 Uploader *u = userp;
257 int r;
258 sd_journal *j;
259 size_t filled = 0;
260 ssize_t w;
261
262 assert(u);
263 assert(nmemb <= SSIZE_MAX / size);
264
265 check_update_watchdog(u);
266
267 j = u->journal;
268
269 while (j && filled < size * nmemb) {
270 if (u->entry_state == ENTRY_DONE) {
271 r = sd_journal_next(j);
272 if (r < 0) {
273 log_error_errno(r, "Failed to move to next entry in journal: %m");
274 return CURL_READFUNC_ABORT;
275 } else if (r == 0) {
276 if (u->input_event)
277 log_debug("No more entries, waiting for journal.");
278 else {
279 log_info("No more entries, closing journal.");
280 close_journal_input(u);
281 }
282
283 u->uploading = false;
284
285 break;
286 }
287
288 u->entry_state = ENTRY_CURSOR;
289 }
290
291 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
292 if (w < 0)
293 return CURL_READFUNC_ABORT;
294 filled += w;
295
296 if (filled == 0) {
297 log_error("Buffer space is too small to write entry.");
298 return CURL_READFUNC_ABORT;
299 } else if (u->entry_state != ENTRY_DONE)
300 /* This means that all available space was used up */
301 break;
302
303 log_debug("Entry %zu (%s) has been uploaded.",
304 u->entries_sent, u->current_cursor);
305 }
306
307 return filled;
308 }
309
310 void close_journal_input(Uploader *u) {
311 assert(u);
312
313 if (u->journal) {
314 log_debug("Closing journal input.");
315
316 sd_journal_close(u->journal);
317 u->journal = NULL;
318 }
319 u->timeout = 0;
320 }
321
322 static int process_journal_input(Uploader *u, int skip) {
323 int r;
324
325 if (u->uploading)
326 return 0;
327
328 r = sd_journal_next_skip(u->journal, skip);
329 if (r < 0)
330 return log_error_errno(r, "Failed to skip to next entry: %m");
331 else if (r < skip)
332 return 0;
333
334 /* have data */
335 u->entry_state = ENTRY_CURSOR;
336 return start_upload(u, journal_input_callback, u);
337 }
338
339 int check_journal_input(Uploader *u) {
340 if (u->input_event) {
341 int r;
342
343 r = sd_journal_process(u->journal);
344 if (r < 0) {
345 log_error_errno(r, "Failed to process journal: %m");
346 close_journal_input(u);
347 return r;
348 }
349
350 if (r == SD_JOURNAL_NOP)
351 return 0;
352 }
353
354 return process_journal_input(u, 1);
355 }
356
357 static int dispatch_journal_input(sd_event_source *event,
358 int fd,
359 uint32_t revents,
360 void *userp) {
361 Uploader *u = userp;
362
363 assert(u);
364
365 if (u->uploading)
366 return 0;
367
368 log_debug("Detected journal input, checking for new data.");
369 return check_journal_input(u);
370 }
371
372 int open_journal_for_upload(Uploader *u,
373 sd_journal *j,
374 const char *cursor,
375 bool after_cursor,
376 bool follow) {
377 int fd, r, events;
378
379 u->journal = j;
380
381 sd_journal_set_data_threshold(j, 0);
382
383 if (follow) {
384 fd = sd_journal_get_fd(j);
385 if (fd < 0)
386 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
387
388 events = sd_journal_get_events(j);
389
390 r = sd_journal_reliable_fd(j);
391 assert(r >= 0);
392 if (r > 0)
393 u->timeout = -1;
394 else
395 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
396
397 r = sd_event_add_io(u->events, &u->input_event,
398 fd, events, dispatch_journal_input, u);
399 if (r < 0)
400 return log_error_errno(r, "Failed to register input event: %m");
401
402 log_debug("Listening for journal events on fd:%d, timeout %d",
403 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
404 } else
405 log_debug("Not listening for journal events.");
406
407 if (cursor) {
408 r = sd_journal_seek_cursor(j, cursor);
409 if (r < 0)
410 return log_error_errno(r, "Failed to seek to cursor %s: %m",
411 cursor);
412 }
413
414 return process_journal_input(u, 1 + !!after_cursor);
415 }