]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/shared/journal-importer.c
mkosi: update arch commit reference
[thirdparty/systemd.git] / src / shared / journal-importer.c
1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
2
3 #include <errno.h>
4 #include <malloc.h>
5 #include <unistd.h>
6
7 #include "alloc-util.h"
8 #include "errno-util.h"
9 #include "escape.h"
10 #include "fd-util.h"
11 #include "io-util.h"
12 #include "journal-file.h"
13 #include "journal-importer.h"
14 #include "journal-util.h"
15 #include "parse-util.h"
16 #include "string-util.h"
17 #include "strv.h"
18 #include "unaligned.h"
19
20 enum {
21 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
22 IMPORTER_STATE_DATA_START, /* reading binary data header */
23 IMPORTER_STATE_DATA, /* reading binary data */
24 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
25 IMPORTER_STATE_EOF, /* done */
26 };
27
28 void journal_importer_cleanup(JournalImporter *imp) {
29 if (imp->fd >= 0 && !imp->passive_fd) {
30 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
31 safe_close(imp->fd);
32 }
33
34 free(imp->name);
35 free(imp->buf);
36 iovw_free_contents(&imp->iovw, false);
37 }
38
39 static char* realloc_buffer(JournalImporter *imp, size_t size) {
40 char *b, *old = ASSERT_PTR(imp)->buf;
41
42 b = GREEDY_REALLOC(imp->buf, size);
43 if (!b)
44 return NULL;
45
46 iovw_rebase(&imp->iovw, old, imp->buf);
47
48 return b;
49 }
50
51 static int get_line(JournalImporter *imp, char **line, size_t *size) {
52 ssize_t n;
53 char *c = NULL;
54
55 assert(imp);
56 assert(imp->state == IMPORTER_STATE_LINE);
57 assert(imp->offset <= imp->filled);
58 assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf));
59 assert(imp->fd >= 0);
60
61 for (;;) {
62 if (imp->buf) {
63 size_t start = MAX(imp->scanned, imp->offset);
64
65 c = memchr(imp->buf + start, '\n',
66 imp->filled - start);
67 if (c)
68 break;
69 }
70
71 imp->scanned = imp->filled;
72 if (imp->scanned >= DATA_SIZE_MAX)
73 return log_warning_errno(SYNTHETIC_ERRNO(ENOBUFS),
74 "Entry is bigger than %u bytes.",
75 DATA_SIZE_MAX);
76
77 if (imp->passive_fd)
78 /* we have to wait for some data to come to us */
79 return -EAGAIN;
80
81 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
82 we reallocate it, we'll increase the size at least a bit. */
83 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
84 if (MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled < LINE_CHUNK &&
85 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
86 return log_oom();
87
88 assert(imp->buf);
89 assert(MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled >= LINE_CHUNK ||
90 MALLOC_SIZEOF_SAFE(imp->buf) >= ENTRY_SIZE_MAX);
91
92 n = read(imp->fd,
93 imp->buf + imp->filled,
94 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
95 if (n < 0) {
96 if (ERRNO_IS_DISCONNECT(errno)) {
97 log_debug_errno(errno, "Got disconnect for importer %s.", strna(imp->name));
98 return 0;
99 }
100
101 if (!ERRNO_IS_TRANSIENT(errno))
102 log_error_errno(errno, "read(%d, ..., %zu): %m",
103 imp->fd,
104 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
105 return -errno;
106 } else if (n == 0)
107 return 0;
108
109 imp->filled += n;
110 }
111
112 *line = imp->buf + imp->offset;
113 *size = c + 1 - imp->buf - imp->offset;
114 imp->offset += *size;
115
116 return 1;
117 }
118
119 static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
120
121 assert(imp);
122 assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
123 assert(size <= DATA_SIZE_MAX);
124 assert(imp->offset <= imp->filled);
125 assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf));
126 assert(imp->fd >= 0);
127 assert(data);
128
129 while (imp->filled - imp->offset < size) {
130 int n;
131
132 if (imp->passive_fd)
133 /* we have to wait for some data to come to us */
134 return -EAGAIN;
135
136 if (!realloc_buffer(imp, imp->offset + size))
137 return log_oom();
138
139 n = read(imp->fd, imp->buf + imp->filled,
140 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
141 if (n < 0) {
142 if (ERRNO_IS_DISCONNECT(errno)) {
143 log_debug_errno(errno, "Got disconnect for importer %s.", strna(imp->name));
144 return 0;
145 }
146
147 if (!ERRNO_IS_TRANSIENT(errno))
148 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
149 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
150 return -errno;
151 } else if (n == 0)
152 return 0;
153
154 imp->filled += n;
155 }
156
157 *data = imp->buf + imp->offset;
158 imp->offset += size;
159
160 return 1;
161 }
162
163 static int get_data_size(JournalImporter *imp) {
164 int r;
165 void *data;
166
167 assert(imp);
168 assert(imp->state == IMPORTER_STATE_DATA_START);
169 assert(imp->data_size == 0);
170
171 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
172 if (r <= 0)
173 return r;
174
175 imp->data_size = unaligned_read_le64(data);
176 if (imp->data_size > DATA_SIZE_MAX)
177 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL),
178 "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
179 imp->data_size, DATA_SIZE_MAX);
180 if (imp->data_size == 0)
181 log_warning("Binary field with zero length");
182
183 return 1;
184 }
185
186 static int get_data_data(JournalImporter *imp, void **data) {
187 int r;
188
189 assert(imp);
190 assert(data);
191 assert(imp->state == IMPORTER_STATE_DATA);
192
193 r = fill_fixed_size(imp, data, imp->data_size);
194 if (r <= 0)
195 return r;
196
197 return 1;
198 }
199
200 static int get_data_newline(JournalImporter *imp) {
201 int r;
202 char *data;
203
204 assert(imp);
205 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
206
207 r = fill_fixed_size(imp, (void**) &data, 1);
208 if (r <= 0)
209 return r;
210
211 assert(data);
212 if (*data != '\n') {
213 char buf[4];
214 int l;
215
216 l = cescape_char(*data, buf);
217 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL),
218 "Expected newline, got '%.*s'", l, buf);
219 }
220
221 return 1;
222 }
223
224 static int process_special_field(JournalImporter *imp, char *line) {
225 const char *value;
226 char buf[CELLESCAPE_DEFAULT_LENGTH];
227 int r;
228
229 assert(line);
230
231 if (STARTSWITH_SET(line, "__CURSOR=", "__SEQNUM=", "__SEQNUM_ID="))
232 /* ignore __CURSOR=, __SEQNUM=, __SEQNUM_ID= which we cannot replicate */
233 return 1;
234
235 value = startswith(line, "__REALTIME_TIMESTAMP=");
236 if (value) {
237 uint64_t x;
238
239 r = safe_atou64(value, &x);
240 if (r < 0)
241 return log_warning_errno(r, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
242 cellescape(buf, sizeof buf, value));
243 else if (!VALID_REALTIME(x)) {
244 log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64, x);
245 return -ERANGE;
246 }
247
248 imp->ts.realtime = x;
249 return 1;
250 }
251
252 value = startswith(line, "__MONOTONIC_TIMESTAMP=");
253 if (value) {
254 uint64_t x;
255
256 r = safe_atou64(value, &x);
257 if (r < 0)
258 return log_warning_errno(r, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
259 cellescape(buf, sizeof buf, value));
260 else if (!VALID_MONOTONIC(x)) {
261 log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64, x);
262 return -ERANGE;
263 }
264
265 imp->ts.monotonic = x;
266 return 1;
267 }
268
269 /* Just a single underline, but it needs special treatment too. */
270 value = startswith(line, "_BOOT_ID=");
271 if (value) {
272 r = sd_id128_from_string(value, &imp->boot_id);
273 if (r < 0)
274 return log_warning_errno(r, "Failed to parse _BOOT_ID '%s': %m",
275 cellescape(buf, sizeof buf, value));
276
277 /* store the field in the usual fashion too */
278 return 0;
279 }
280
281 value = startswith(line, "__");
282 if (value) {
283 log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf, sizeof buf, value));
284 return 1;
285 }
286
287 /* no dunder */
288 return 0;
289 }
290
291 int journal_importer_process_data(JournalImporter *imp) {
292 int r;
293
294 switch (imp->state) {
295 case IMPORTER_STATE_LINE: {
296 char *line, *sep;
297 size_t n = 0;
298
299 assert(imp->data_size == 0);
300
301 r = get_line(imp, &line, &n);
302 if (r < 0)
303 return r;
304 if (r == 0) {
305 imp->state = IMPORTER_STATE_EOF;
306 return 0;
307 }
308 assert(n > 0);
309 assert(line[n-1] == '\n');
310
311 if (n == 1) {
312 log_trace("Received empty line, event is ready");
313 return 1;
314 }
315
316 /* MESSAGE=xxx\n
317 or
318 COREDUMP\n
319 LLLLLLLL0011223344...\n
320 */
321 sep = memchr(line, '=', n);
322 if (sep) {
323 /* chomp newline */
324 n--;
325
326 if (!journal_field_valid(line, sep - line, true)) {
327 char buf[64], *t;
328
329 t = strndupa_safe(line, sep - line);
330 log_debug("Ignoring invalid field: \"%s\"",
331 cellescape(buf, sizeof buf, t));
332
333 return 0;
334 }
335
336 line[n] = '\0';
337 r = process_special_field(imp, line);
338 if (r != 0)
339 return r < 0 ? r : 0;
340
341 r = iovw_put(&imp->iovw, line, n);
342 if (r < 0)
343 return r;
344 } else {
345 if (!journal_field_valid(line, n - 1, true)) {
346 char buf[64], *t;
347
348 t = strndupa_safe(line, n - 1);
349 log_debug("Ignoring invalid field: \"%s\"",
350 cellescape(buf, sizeof buf, t));
351
352 return 0;
353 }
354
355 /* replace \n with = */
356 line[n-1] = '=';
357
358 imp->field_len = n;
359 imp->state = IMPORTER_STATE_DATA_START;
360
361 /* we cannot put the field in iovec until we have all data */
362 }
363
364 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
365
366 return 0; /* continue */
367 }
368
369 case IMPORTER_STATE_DATA_START:
370 assert(imp->data_size == 0);
371
372 r = get_data_size(imp);
373 // log_debug("get_data_size() -> %d", r);
374 if (r < 0)
375 return r;
376 if (r == 0) {
377 imp->state = IMPORTER_STATE_EOF;
378 return 0;
379 }
380
381 imp->state = imp->data_size > 0 ?
382 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
383
384 return 0; /* continue */
385
386 case IMPORTER_STATE_DATA: {
387 void *data;
388 char *field;
389
390 assert(imp->data_size > 0);
391
392 r = get_data_data(imp, &data);
393 // log_debug("get_data_data() -> %d", r);
394 if (r < 0)
395 return r;
396 if (r == 0) {
397 imp->state = IMPORTER_STATE_EOF;
398 return 0;
399 }
400
401 assert(data);
402
403 field = (char*) data - sizeof(uint64_t) - imp->field_len;
404 memmove(field + sizeof(uint64_t), field, imp->field_len);
405
406 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
407 if (r < 0)
408 return r;
409
410 imp->state = IMPORTER_STATE_DATA_FINISH;
411
412 return 0; /* continue */
413 }
414
415 case IMPORTER_STATE_DATA_FINISH:
416 r = get_data_newline(imp);
417 // log_debug("get_data_newline() -> %d", r);
418 if (r < 0)
419 return r;
420 if (r == 0) {
421 imp->state = IMPORTER_STATE_EOF;
422 return 0;
423 }
424
425 imp->data_size = 0;
426 imp->state = IMPORTER_STATE_LINE;
427
428 return 0; /* continue */
429 default:
430 assert_not_reached();
431 }
432 }
433
434 int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
435 assert(imp);
436 assert(imp->state != IMPORTER_STATE_EOF);
437
438 if (!realloc_buffer(imp, imp->filled + size))
439 return log_error_errno(ENOMEM,
440 "Failed to store received data of size %zu "
441 "(in addition to existing %zu bytes with %zu filled): %m",
442 size, MALLOC_SIZEOF_SAFE(imp->buf), imp->filled);
443
444 memcpy(imp->buf + imp->filled, data, size);
445 imp->filled += size;
446
447 return 0;
448 }
449
450 void journal_importer_drop_iovw(JournalImporter *imp) {
451 size_t remain, target;
452
453 /* This function drops processed data that along with the iovw that points at it */
454
455 iovw_free_contents(&imp->iovw, false);
456
457 /* possibly reset buffer position */
458 remain = imp->filled - imp->offset;
459
460 if (remain == 0) /* no brainer */
461 imp->offset = imp->scanned = imp->filled = 0;
462 else if (imp->offset > MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled &&
463 imp->offset > remain) {
464 memcpy(imp->buf, imp->buf + imp->offset, remain);
465 imp->offset = imp->scanned = 0;
466 imp->filled = remain;
467 }
468
469 target = MALLOC_SIZEOF_SAFE(imp->buf);
470 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
471 target /= 2;
472 if (target < MALLOC_SIZEOF_SAFE(imp->buf)) {
473 char *tmp;
474 size_t old_size;
475
476 old_size = MALLOC_SIZEOF_SAFE(imp->buf);
477
478 tmp = realloc(imp->buf, target);
479 if (!tmp)
480 log_warning("Failed to reallocate buffer to (smaller) size %zu",
481 target);
482 else {
483 log_debug("Reallocated buffer from %zu to %zu bytes",
484 old_size, target);
485 imp->buf = tmp;
486 }
487 }
488 }
489
490 bool journal_importer_eof(const JournalImporter *imp) {
491 return imp->state == IMPORTER_STATE_EOF;
492 }