]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-upload-journal.c
Add SPDX license identifiers to source files under the LGPL
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
CommitLineData
53e1b683 1/* SPDX-License-Identifier: LGPL-2.1+ */
b5efdb8a
LP
2/***
3 This file is part of systemd.
4
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
6
7 systemd is free software; you can redistribute it and/or modify it
8 under the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 2.1 of the License, or
10 (at your option) any later version.
11
12 systemd is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with systemd; If not, see <http://www.gnu.org/licenses/>.
19***/
20
eacbb4d3 21#include <curl/curl.h>
cf0fbc49 22#include <stdbool.h>
eacbb4d3 23
b5efdb8a
LP
24#include "alloc-util.h"
25#include "journal-upload.h"
eacbb4d3
ZJS
26#include "log.h"
27#include "utf8.h"
b5efdb8a 28#include "util.h"
d79ca7a6 29#include "sd-daemon.h"
eacbb4d3
ZJS
30
31/**
32 * Write up to size bytes to buf. Return negative on error, and number of
33 * bytes written otherwise. The last case is a kind of an error too.
34 */
35static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
36 int r;
37 size_t pos = 0;
38
39 assert(size <= SSIZE_MAX);
40
57255510 41 for (;;) {
eacbb4d3
ZJS
42
43 switch(u->entry_state) {
44 case ENTRY_CURSOR: {
a1e58e8e 45 u->current_cursor = mfree(u->current_cursor);
eacbb4d3 46
722b6795 47 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
eb56eb9b
MS
48 if (r < 0)
49 return log_error_errno(r, "Failed to get cursor: %m");
eacbb4d3
ZJS
50
51 r = snprintf(buf + pos, size - pos,
722b6795 52 "__CURSOR=%s\n", u->current_cursor);
eacbb4d3
ZJS
53 if (pos + r > size)
54 /* not enough space */
55 return pos;
56
313cefa1 57 u->entry_state++;
eacbb4d3
ZJS
58
59 if (pos + r == size) {
60 /* exactly one character short, but we don't need it */
61 buf[size - 1] = '\n';
62 return size;
63 }
64
65 pos += r;
66 } /* fall through */
67
68 case ENTRY_REALTIME: {
69 usec_t realtime;
70
71 r = sd_journal_get_realtime_usec(u->journal, &realtime);
eb56eb9b
MS
72 if (r < 0)
73 return log_error_errno(r, "Failed to get realtime timestamp: %m");
eacbb4d3
ZJS
74
75 r = snprintf(buf + pos, size - pos,
76 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
77 if (r + pos > size)
78 /* not enough space */
79 return pos;
80
313cefa1 81 u->entry_state++;
eacbb4d3
ZJS
82
83 if (r + pos == size) {
84 /* exactly one character short, but we don't need it */
85 buf[size - 1] = '\n';
86 return size;
87 }
88
89 pos += r;
90 } /* fall through */
91
92 case ENTRY_MONOTONIC: {
93 usec_t monotonic;
94 sd_id128_t boot_id;
95
96 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
eb56eb9b
MS
97 if (r < 0)
98 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
eacbb4d3
ZJS
99
100 r = snprintf(buf + pos, size - pos,
101 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
102 if (r + pos > size)
103 /* not enough space */
104 return pos;
105
313cefa1 106 u->entry_state++;
eacbb4d3
ZJS
107
108 if (r + pos == size) {
109 /* exactly one character short, but we don't need it */
110 buf[size - 1] = '\n';
111 return size;
112 }
113
114 pos += r;
115 } /* fall through */
116
117 case ENTRY_BOOT_ID: {
118 sd_id128_t boot_id;
119 char sid[33];
120
121 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
eb56eb9b
MS
122 if (r < 0)
123 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
eacbb4d3
ZJS
124
125 r = snprintf(buf + pos, size - pos,
126 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
5ffa8c81 127 if (r + pos > size)
eacbb4d3
ZJS
128 /* not enough space */
129 return pos;
130
313cefa1 131 u->entry_state++;
eacbb4d3
ZJS
132
133 if (r + pos == size) {
134 /* exactly one character short, but we don't need it */
135 buf[size - 1] = '\n';
136 return size;
137 }
138
139 pos += r;
140 } /* fall through */
141
142 case ENTRY_NEW_FIELD: {
143 u->field_pos = 0;
144
145 r = sd_journal_enumerate_data(u->journal,
146 &u->field_data,
147 &u->field_length);
eb56eb9b
MS
148 if (r < 0)
149 return log_error_errno(r, "Failed to move to next field in entry: %m");
150 else if (r == 0) {
eacbb4d3
ZJS
151 u->entry_state = ENTRY_OUTRO;
152 continue;
153 }
154
155 if (!utf8_is_printable_newline(u->field_data,
156 u->field_length, false)) {
157 u->entry_state = ENTRY_BINARY_FIELD_START;
158 continue;
159 }
160
313cefa1 161 u->entry_state++;
eacbb4d3
ZJS
162 } /* fall through */
163
164 case ENTRY_TEXT_FIELD:
165 case ENTRY_BINARY_FIELD: {
166 bool done;
167 size_t tocopy;
168
169 done = size - pos > u->field_length - u->field_pos;
170 if (done)
171 tocopy = u->field_length - u->field_pos;
172 else
173 tocopy = size - pos;
174
175 memcpy(buf + pos,
176 (char*) u->field_data + u->field_pos,
177 tocopy);
178
179 if (done) {
180 buf[pos + tocopy] = '\n';
181 pos += tocopy + 1;
182 u->entry_state = ENTRY_NEW_FIELD;
183 continue;
184 } else {
185 u->field_pos += tocopy;
186 return size;
187 }
188 }
189
190 case ENTRY_BINARY_FIELD_START: {
191 const char *c;
192 size_t len;
193
194 c = memchr(u->field_data, '=', u->field_length);
195 if (!c || c == u->field_data) {
196 log_error("Invalid field.");
197 return -EINVAL;
198 }
199
200 len = c - (const char*)u->field_data;
201
202 /* need space for label + '\n' */
203 if (size - pos < len + 1)
204 return pos;
205
206 memcpy(buf + pos, u->field_data, len);
207 buf[pos + len] = '\n';
208 pos += len + 1;
209
210 u->field_pos = len + 1;
313cefa1 211 u->entry_state++;
eacbb4d3
ZJS
212 } /* fall through */
213
214 case ENTRY_BINARY_FIELD_SIZE: {
215 uint64_t le64;
216
217 /* need space for uint64_t */
218 if (size - pos < 8)
219 return pos;
220
221 le64 = htole64(u->field_length - u->field_pos);
222 memcpy(buf + pos, &le64, 8);
223 pos += 8;
224
313cefa1 225 u->entry_state++;
eacbb4d3
ZJS
226 continue;
227 }
228
229 case ENTRY_OUTRO:
230 /* need space for '\n' */
231 if (size - pos < 1)
232 return pos;
233
234 buf[pos++] = '\n';
313cefa1
VC
235 u->entry_state++;
236 u->entries_sent++;
eacbb4d3
ZJS
237
238 return pos;
239
240 default:
241 assert_not_reached("WTF?");
242 }
243 }
244 assert_not_reached("WTF?");
245}
246
d79ca7a6 247static inline void check_update_watchdog(Uploader *u) {
d79ca7a6
KC
248 usec_t after;
249 usec_t elapsed_time;
250
0aa176a7 251 if (u->watchdog_usec <= 0)
d79ca7a6 252 return;
0aa176a7
ZJS
253
254 after = now(CLOCK_MONOTONIC);
54d8ef14 255 elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp);
0aa176a7
ZJS
256 if (elapsed_time > u->watchdog_usec / 2) {
257 log_debug("Update watchdog timer");
258 sd_notify(false, "WATCHDOG=1");
259 u->watchdog_timestamp = after;
d79ca7a6
KC
260 }
261}
262
eacbb4d3
ZJS
263static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
264 Uploader *u = userp;
265 int r;
266 sd_journal *j;
267 size_t filled = 0;
268 ssize_t w;
269
270 assert(u);
271 assert(nmemb <= SSIZE_MAX / size);
272
d79ca7a6
KC
273 check_update_watchdog(u);
274
eacbb4d3
ZJS
275 j = u->journal;
276
277 while (j && filled < size * nmemb) {
278 if (u->entry_state == ENTRY_DONE) {
279 r = sd_journal_next(j);
280 if (r < 0) {
c33b3297 281 log_error_errno(r, "Failed to move to next entry in journal: %m");
eacbb4d3
ZJS
282 return CURL_READFUNC_ABORT;
283 } else if (r == 0) {
284 if (u->input_event)
285 log_debug("No more entries, waiting for journal.");
286 else {
287 log_info("No more entries, closing journal.");
288 close_journal_input(u);
289 }
290
291 u->uploading = false;
292
293 break;
294 }
295
296 u->entry_state = ENTRY_CURSOR;
297 }
298
299 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
300 if (w < 0)
301 return CURL_READFUNC_ABORT;
302 filled += w;
303
304 if (filled == 0) {
305 log_error("Buffer space is too small to write entry.");
306 return CURL_READFUNC_ABORT;
307 } else if (u->entry_state != ENTRY_DONE)
308 /* This means that all available space was used up */
309 break;
310
311 log_debug("Entry %zu (%s) has been uploaded.",
722b6795 312 u->entries_sent, u->current_cursor);
eacbb4d3
ZJS
313 }
314
315 return filled;
316}
317
318void close_journal_input(Uploader *u) {
319 assert(u);
320
321 if (u->journal) {
322 log_debug("Closing journal input.");
323
324 sd_journal_close(u->journal);
325 u->journal = NULL;
326 }
327 u->timeout = 0;
328}
329
330static int process_journal_input(Uploader *u, int skip) {
331 int r;
332
8a3db16d
KC
333 if (u->uploading)
334 return 0;
335
eacbb4d3 336 r = sd_journal_next_skip(u->journal, skip);
eb56eb9b
MS
337 if (r < 0)
338 return log_error_errno(r, "Failed to skip to next entry: %m");
339 else if (r < skip)
eacbb4d3
ZJS
340 return 0;
341
342 /* have data */
343 u->entry_state = ENTRY_CURSOR;
344 return start_upload(u, journal_input_callback, u);
345}
346
347int check_journal_input(Uploader *u) {
348 if (u->input_event) {
349 int r;
350
351 r = sd_journal_process(u->journal);
352 if (r < 0) {
da927ba9 353 log_error_errno(r, "Failed to process journal: %m");
eacbb4d3
ZJS
354 close_journal_input(u);
355 return r;
356 }
357
358 if (r == SD_JOURNAL_NOP)
359 return 0;
360 }
361
362 return process_journal_input(u, 1);
363}
364
365static int dispatch_journal_input(sd_event_source *event,
366 int fd,
367 uint32_t revents,
368 void *userp) {
369 Uploader *u = userp;
370
371 assert(u);
372
8a3db16d 373 if (u->uploading)
eacbb4d3 374 return 0;
eacbb4d3
ZJS
375
376 log_debug("Detected journal input, checking for new data.");
377 return check_journal_input(u);
378}
379
380int open_journal_for_upload(Uploader *u,
381 sd_journal *j,
382 const char *cursor,
383 bool after_cursor,
384 bool follow) {
385 int fd, r, events;
386
387 u->journal = j;
388
389 sd_journal_set_data_threshold(j, 0);
390
391 if (follow) {
392 fd = sd_journal_get_fd(j);
eb56eb9b
MS
393 if (fd < 0)
394 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
eacbb4d3
ZJS
395
396 events = sd_journal_get_events(j);
397
398 r = sd_journal_reliable_fd(j);
399 assert(r >= 0);
400 if (r > 0)
401 u->timeout = -1;
402 else
403 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
404
405 r = sd_event_add_io(u->events, &u->input_event,
406 fd, events, dispatch_journal_input, u);
eb56eb9b
MS
407 if (r < 0)
408 return log_error_errno(r, "Failed to register input event: %m");
eacbb4d3
ZJS
409
410 log_debug("Listening for journal events on fd:%d, timeout %d",
411 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
412 } else
413 log_debug("Not listening for journal events.");
414
415 if (cursor) {
416 r = sd_journal_seek_cursor(j, cursor);
ece174c5 417 if (r < 0)
eb56eb9b
MS
418 return log_error_errno(r, "Failed to seek to cursor %s: %m",
419 cursor);
eacbb4d3
ZJS
420 }
421
422 return process_journal_input(u, 1 + !!after_cursor);
423}