]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload-journal.c
Add SPDX license identifiers to source files under the LGPL
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2 /***
3 This file is part of systemd.
4
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
6
7 systemd is free software; you can redistribute it and/or modify it
8 under the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 2.1 of the License, or
10 (at your option) any later version.
11
12 systemd is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with systemd; If not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #include <curl/curl.h>
22 #include <stdbool.h>
23
24 #include "alloc-util.h"
25 #include "journal-upload.h"
26 #include "log.h"
27 #include "utf8.h"
28 #include "util.h"
29 #include "sd-daemon.h"
30
31 /**
32 * Write up to size bytes to buf. Return negative on error, and number of
33 * bytes written otherwise. The last case is a kind of an error too.
34 */
35 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
36 int r;
37 size_t pos = 0;
38
39 assert(size <= SSIZE_MAX);
40
41 for (;;) {
42
43 switch(u->entry_state) {
44 case ENTRY_CURSOR: {
45 u->current_cursor = mfree(u->current_cursor);
46
47 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
48 if (r < 0)
49 return log_error_errno(r, "Failed to get cursor: %m");
50
51 r = snprintf(buf + pos, size - pos,
52 "__CURSOR=%s\n", u->current_cursor);
53 if (pos + r > size)
54 /* not enough space */
55 return pos;
56
57 u->entry_state++;
58
59 if (pos + r == size) {
60 /* exactly one character short, but we don't need it */
61 buf[size - 1] = '\n';
62 return size;
63 }
64
65 pos += r;
66 } /* fall through */
67
68 case ENTRY_REALTIME: {
69 usec_t realtime;
70
71 r = sd_journal_get_realtime_usec(u->journal, &realtime);
72 if (r < 0)
73 return log_error_errno(r, "Failed to get realtime timestamp: %m");
74
75 r = snprintf(buf + pos, size - pos,
76 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
77 if (r + pos > size)
78 /* not enough space */
79 return pos;
80
81 u->entry_state++;
82
83 if (r + pos == size) {
84 /* exactly one character short, but we don't need it */
85 buf[size - 1] = '\n';
86 return size;
87 }
88
89 pos += r;
90 } /* fall through */
91
92 case ENTRY_MONOTONIC: {
93 usec_t monotonic;
94 sd_id128_t boot_id;
95
96 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
97 if (r < 0)
98 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
99
100 r = snprintf(buf + pos, size - pos,
101 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
102 if (r + pos > size)
103 /* not enough space */
104 return pos;
105
106 u->entry_state++;
107
108 if (r + pos == size) {
109 /* exactly one character short, but we don't need it */
110 buf[size - 1] = '\n';
111 return size;
112 }
113
114 pos += r;
115 } /* fall through */
116
117 case ENTRY_BOOT_ID: {
118 sd_id128_t boot_id;
119 char sid[33];
120
121 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
122 if (r < 0)
123 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
124
125 r = snprintf(buf + pos, size - pos,
126 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
127 if (r + pos > size)
128 /* not enough space */
129 return pos;
130
131 u->entry_state++;
132
133 if (r + pos == size) {
134 /* exactly one character short, but we don't need it */
135 buf[size - 1] = '\n';
136 return size;
137 }
138
139 pos += r;
140 } /* fall through */
141
142 case ENTRY_NEW_FIELD: {
143 u->field_pos = 0;
144
145 r = sd_journal_enumerate_data(u->journal,
146 &u->field_data,
147 &u->field_length);
148 if (r < 0)
149 return log_error_errno(r, "Failed to move to next field in entry: %m");
150 else if (r == 0) {
151 u->entry_state = ENTRY_OUTRO;
152 continue;
153 }
154
155 if (!utf8_is_printable_newline(u->field_data,
156 u->field_length, false)) {
157 u->entry_state = ENTRY_BINARY_FIELD_START;
158 continue;
159 }
160
161 u->entry_state++;
162 } /* fall through */
163
164 case ENTRY_TEXT_FIELD:
165 case ENTRY_BINARY_FIELD: {
166 bool done;
167 size_t tocopy;
168
169 done = size - pos > u->field_length - u->field_pos;
170 if (done)
171 tocopy = u->field_length - u->field_pos;
172 else
173 tocopy = size - pos;
174
175 memcpy(buf + pos,
176 (char*) u->field_data + u->field_pos,
177 tocopy);
178
179 if (done) {
180 buf[pos + tocopy] = '\n';
181 pos += tocopy + 1;
182 u->entry_state = ENTRY_NEW_FIELD;
183 continue;
184 } else {
185 u->field_pos += tocopy;
186 return size;
187 }
188 }
189
190 case ENTRY_BINARY_FIELD_START: {
191 const char *c;
192 size_t len;
193
194 c = memchr(u->field_data, '=', u->field_length);
195 if (!c || c == u->field_data) {
196 log_error("Invalid field.");
197 return -EINVAL;
198 }
199
200 len = c - (const char*)u->field_data;
201
202 /* need space for label + '\n' */
203 if (size - pos < len + 1)
204 return pos;
205
206 memcpy(buf + pos, u->field_data, len);
207 buf[pos + len] = '\n';
208 pos += len + 1;
209
210 u->field_pos = len + 1;
211 u->entry_state++;
212 } /* fall through */
213
214 case ENTRY_BINARY_FIELD_SIZE: {
215 uint64_t le64;
216
217 /* need space for uint64_t */
218 if (size - pos < 8)
219 return pos;
220
221 le64 = htole64(u->field_length - u->field_pos);
222 memcpy(buf + pos, &le64, 8);
223 pos += 8;
224
225 u->entry_state++;
226 continue;
227 }
228
229 case ENTRY_OUTRO:
230 /* need space for '\n' */
231 if (size - pos < 1)
232 return pos;
233
234 buf[pos++] = '\n';
235 u->entry_state++;
236 u->entries_sent++;
237
238 return pos;
239
240 default:
241 assert_not_reached("WTF?");
242 }
243 }
244 assert_not_reached("WTF?");
245 }
246
247 static inline void check_update_watchdog(Uploader *u) {
248 usec_t after;
249 usec_t elapsed_time;
250
251 if (u->watchdog_usec <= 0)
252 return;
253
254 after = now(CLOCK_MONOTONIC);
255 elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
256 if (elapsed_time > u->watchdog_usec / 2) {
257 log_debug("Update watchdog timer");
258 sd_notify(false, "WATCHDOG=1");
259 u->watchdog_timestamp = after;
260 }
261 }
262
263 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
264 Uploader *u = userp;
265 int r;
266 sd_journal *j;
267 size_t filled = 0;
268 ssize_t w;
269
270 assert(u);
271 assert(nmemb <= SSIZE_MAX / size);
272
273 check_update_watchdog(u);
274
275 j = u->journal;
276
277 while (j && filled < size * nmemb) {
278 if (u->entry_state == ENTRY_DONE) {
279 r = sd_journal_next(j);
280 if (r < 0) {
281 log_error_errno(r, "Failed to move to next entry in journal: %m");
282 return CURL_READFUNC_ABORT;
283 } else if (r == 0) {
284 if (u->input_event)
285 log_debug("No more entries, waiting for journal.");
286 else {
287 log_info("No more entries, closing journal.");
288 close_journal_input(u);
289 }
290
291 u->uploading = false;
292
293 break;
294 }
295
296 u->entry_state = ENTRY_CURSOR;
297 }
298
299 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
300 if (w < 0)
301 return CURL_READFUNC_ABORT;
302 filled += w;
303
304 if (filled == 0) {
305 log_error("Buffer space is too small to write entry.");
306 return CURL_READFUNC_ABORT;
307 } else if (u->entry_state != ENTRY_DONE)
308 /* This means that all available space was used up */
309 break;
310
311 log_debug("Entry %zu (%s) has been uploaded.",
312 u->entries_sent, u->current_cursor);
313 }
314
315 return filled;
316 }
317
318 void close_journal_input(Uploader *u) {
319 assert(u);
320
321 if (u->journal) {
322 log_debug("Closing journal input.");
323
324 sd_journal_close(u->journal);
325 u->journal = NULL;
326 }
327 u->timeout = 0;
328 }
329
330 static int process_journal_input(Uploader *u, int skip) {
331 int r;
332
333 if (u->uploading)
334 return 0;
335
336 r = sd_journal_next_skip(u->journal, skip);
337 if (r < 0)
338 return log_error_errno(r, "Failed to skip to next entry: %m");
339 else if (r < skip)
340 return 0;
341
342 /* have data */
343 u->entry_state = ENTRY_CURSOR;
344 return start_upload(u, journal_input_callback, u);
345 }
346
347 int check_journal_input(Uploader *u) {
348 if (u->input_event) {
349 int r;
350
351 r = sd_journal_process(u->journal);
352 if (r < 0) {
353 log_error_errno(r, "Failed to process journal: %m");
354 close_journal_input(u);
355 return r;
356 }
357
358 if (r == SD_JOURNAL_NOP)
359 return 0;
360 }
361
362 return process_journal_input(u, 1);
363 }
364
365 static int dispatch_journal_input(sd_event_source *event,
366 int fd,
367 uint32_t revents,
368 void *userp) {
369 Uploader *u = userp;
370
371 assert(u);
372
373 if (u->uploading)
374 return 0;
375
376 log_debug("Detected journal input, checking for new data.");
377 return check_journal_input(u);
378 }
379
380 int open_journal_for_upload(Uploader *u,
381 sd_journal *j,
382 const char *cursor,
383 bool after_cursor,
384 bool follow) {
385 int fd, r, events;
386
387 u->journal = j;
388
389 sd_journal_set_data_threshold(j, 0);
390
391 if (follow) {
392 fd = sd_journal_get_fd(j);
393 if (fd < 0)
394 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
395
396 events = sd_journal_get_events(j);
397
398 r = sd_journal_reliable_fd(j);
399 assert(r >= 0);
400 if (r > 0)
401 u->timeout = -1;
402 else
403 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
404
405 r = sd_event_add_io(u->events, &u->input_event,
406 fd, events, dispatch_journal_input, u);
407 if (r < 0)
408 return log_error_errno(r, "Failed to register input event: %m");
409
410 log_debug("Listening for journal events on fd:%d, timeout %d",
411 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
412 } else
413 log_debug("Not listening for journal events.");
414
415 if (cursor) {
416 r = sd_journal_seek_cursor(j, cursor);
417 if (r < 0)
418 return log_error_errno(r, "Failed to seek to cursor %s: %m",
419 cursor);
420 }
421
422 return process_journal_input(u, 1 + !!after_cursor);
423 }