]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/shared/journal-importer.c
alloc-util: simplify GREEDY_REALLOC() logic by relying on malloc_usable_size()
[thirdparty/systemd.git] / src / shared / journal-importer.c
CommitLineData
db9ecf05 1/* SPDX-License-Identifier: LGPL-2.1-or-later */
b18453ed 2
dccca82b 3#include <errno.h>
319a4f4b 4#include <malloc.h>
b18453ed
ZJS
5#include <unistd.h>
6
7#include "alloc-util.h"
4bbccb02 8#include "errno-util.h"
b778252b 9#include "escape.h"
b18453ed 10#include "fd-util.h"
e6a7ec4b 11#include "io-util.h"
41b0b127 12#include "journal-file.h"
e6a7ec4b 13#include "journal-importer.h"
1e448731 14#include "journal-util.h"
b18453ed
ZJS
15#include "parse-util.h"
16#include "string-util.h"
f652c62d 17#include "unaligned.h"
b18453ed
ZJS
18
19enum {
20 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
21 IMPORTER_STATE_DATA_START, /* reading binary data header */
22 IMPORTER_STATE_DATA, /* reading binary data */
23 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
24 IMPORTER_STATE_EOF, /* done */
25};
26
b18453ed
ZJS
27void journal_importer_cleanup(JournalImporter *imp) {
28 if (imp->fd >= 0 && !imp->passive_fd) {
29 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
30 safe_close(imp->fd);
31 }
32
2ddb70d2 33 free(imp->name);
b18453ed 34 free(imp->buf);
11e6d971 35 iovw_free_contents(&imp->iovw, false);
b18453ed
ZJS
36}
37
38static char* realloc_buffer(JournalImporter *imp, size_t size) {
39 char *b, *old = imp->buf;
40
319a4f4b 41 b = GREEDY_REALLOC(imp->buf, size);
b18453ed
ZJS
42 if (!b)
43 return NULL;
44
45 iovw_rebase(&imp->iovw, old, imp->buf);
46
47 return b;
48}
49
50static int get_line(JournalImporter *imp, char **line, size_t *size) {
51 ssize_t n;
52 char *c = NULL;
53
54 assert(imp);
55 assert(imp->state == IMPORTER_STATE_LINE);
56 assert(imp->offset <= imp->filled);
319a4f4b 57 assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf));
b18453ed
ZJS
58 assert(imp->fd >= 0);
59
60 for (;;) {
61 if (imp->buf) {
62 size_t start = MAX(imp->scanned, imp->offset);
63
64 c = memchr(imp->buf + start, '\n',
65 imp->filled - start);
4e361acc 66 if (c)
b18453ed
ZJS
67 break;
68 }
69
70 imp->scanned = imp->filled;
baaa35ad 71 if (imp->scanned >= DATA_SIZE_MAX)
ef4d6abe 72 return log_error_errno(SYNTHETIC_ERRNO(ENOBUFS),
baaa35ad
ZJS
73 "Entry is bigger than %u bytes.",
74 DATA_SIZE_MAX);
b18453ed
ZJS
75
76 if (imp->passive_fd)
77 /* we have to wait for some data to come to us */
78 return -EAGAIN;
79
80 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
81 we reallocate it, we'll increase the size at least a bit. */
82 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
319a4f4b 83 if (MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled < LINE_CHUNK &&
b18453ed
ZJS
84 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
85 return log_oom();
86
87 assert(imp->buf);
319a4f4b
LP
88 assert(MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled >= LINE_CHUNK ||
89 MALLOC_SIZEOF_SAFE(imp->buf) >= ENTRY_SIZE_MAX);
b18453ed
ZJS
90
91 n = read(imp->fd,
92 imp->buf + imp->filled,
319a4f4b 93 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
b18453ed
ZJS
94 if (n < 0) {
95 if (errno != EAGAIN)
96 log_error_errno(errno, "read(%d, ..., %zu): %m",
97 imp->fd,
319a4f4b 98 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
b18453ed
ZJS
99 return -errno;
100 } else if (n == 0)
101 return 0;
102
103 imp->filled += n;
104 }
105
106 *line = imp->buf + imp->offset;
107 *size = c + 1 - imp->buf - imp->offset;
108 imp->offset += *size;
109
110 return 1;
111}
112
113static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
114
115 assert(imp);
3742095b 116 assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
b18453ed
ZJS
117 assert(size <= DATA_SIZE_MAX);
118 assert(imp->offset <= imp->filled);
319a4f4b 119 assert(imp->filled <= MALLOC_SIZEOF_SAFE(imp->buf));
b18453ed
ZJS
120 assert(imp->fd >= 0);
121 assert(data);
122
123 while (imp->filled - imp->offset < size) {
124 int n;
125
126 if (imp->passive_fd)
127 /* we have to wait for some data to come to us */
128 return -EAGAIN;
129
130 if (!realloc_buffer(imp, imp->offset + size))
131 return log_oom();
132
133 n = read(imp->fd, imp->buf + imp->filled,
319a4f4b 134 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
b18453ed
ZJS
135 if (n < 0) {
136 if (errno != EAGAIN)
137 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
319a4f4b 138 MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled);
b18453ed
ZJS
139 return -errno;
140 } else if (n == 0)
141 return 0;
142
143 imp->filled += n;
144 }
145
146 *data = imp->buf + imp->offset;
147 imp->offset += size;
148
149 return 1;
150}
151
152static int get_data_size(JournalImporter *imp) {
153 int r;
154 void *data;
155
156 assert(imp);
157 assert(imp->state == IMPORTER_STATE_DATA_START);
158 assert(imp->data_size == 0);
159
160 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
161 if (r <= 0)
162 return r;
163
f652c62d 164 imp->data_size = unaligned_read_le64(data);
baaa35ad
ZJS
165 if (imp->data_size > DATA_SIZE_MAX)
166 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
167 "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
168 imp->data_size, DATA_SIZE_MAX);
b18453ed
ZJS
169 if (imp->data_size == 0)
170 log_warning("Binary field with zero length");
171
172 return 1;
173}
174
175static int get_data_data(JournalImporter *imp, void **data) {
176 int r;
177
178 assert(imp);
179 assert(data);
180 assert(imp->state == IMPORTER_STATE_DATA);
181
182 r = fill_fixed_size(imp, data, imp->data_size);
183 if (r <= 0)
184 return r;
185
186 return 1;
187}
188
189static int get_data_newline(JournalImporter *imp) {
190 int r;
191 char *data;
192
193 assert(imp);
194 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
195
196 r = fill_fixed_size(imp, (void**) &data, 1);
197 if (r <= 0)
198 return r;
199
200 assert(data);
201 if (*data != '\n') {
b778252b
ZJS
202 char buf[4];
203 int l;
204
205 l = cescape_char(*data, buf);
baaa35ad
ZJS
206 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
207 "Expected newline, got '%.*s'", l, buf);
b18453ed
ZJS
208 }
209
210 return 1;
211}
212
c0b6ada7
ZJS
213static int process_special_field(JournalImporter *imp, char *line) {
214 const char *value;
cca24fc3 215 char buf[CELLESCAPE_DEFAULT_LENGTH];
b18453ed
ZJS
216 int r;
217
218 assert(line);
b18453ed 219
c0b6ada7
ZJS
220 value = startswith(line, "__CURSOR=");
221 if (value)
b18453ed
ZJS
222 /* ignore __CURSOR */
223 return 1;
224
c0b6ada7
ZJS
225 value = startswith(line, "__REALTIME_TIMESTAMP=");
226 if (value) {
41b0b127 227 uint64_t x;
cca24fc3 228
c0b6ada7 229 r = safe_atou64(value, &x);
b18453ed 230 if (r < 0)
cca24fc3 231 return log_warning_errno(r, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
c0b6ada7 232 cellescape(buf, sizeof buf, value));
41b0b127
ZJS
233 else if (!VALID_REALTIME(x)) {
234 log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64, x);
235 return -ERANGE;
236 }
237
238 imp->ts.realtime = x;
239 return 1;
b18453ed
ZJS
240 }
241
c0b6ada7
ZJS
242 value = startswith(line, "__MONOTONIC_TIMESTAMP=");
243 if (value) {
41b0b127 244 uint64_t x;
cca24fc3 245
c0b6ada7 246 r = safe_atou64(value, &x);
b18453ed 247 if (r < 0)
cca24fc3 248 return log_warning_errno(r, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
c0b6ada7 249 cellescape(buf, sizeof buf, value));
41b0b127
ZJS
250 else if (!VALID_MONOTONIC(x)) {
251 log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64, x);
252 return -ERANGE;
253 }
254
255 imp->ts.monotonic = x;
256 return 1;
b18453ed
ZJS
257 }
258
c0b6ada7
ZJS
259 /* Just a single underline, but it needs special treatment too. */
260 value = startswith(line, "_BOOT_ID=");
261 if (value) {
262 r = sd_id128_from_string(value, &imp->boot_id);
263 if (r < 0)
264 return log_warning_errno(r, "Failed to parse _BOOT_ID '%s': %m",
265 cellescape(buf, sizeof buf, value));
266
267 /* store the field in the usual fashion too */
268 return 0;
269 }
270
271 value = startswith(line, "__");
272 if (value) {
273 log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf, sizeof buf, value));
b18453ed
ZJS
274 return 1;
275 }
276
277 /* no dunder */
278 return 0;
279}
280
281int journal_importer_process_data(JournalImporter *imp) {
282 int r;
283
284 switch(imp->state) {
285 case IMPORTER_STATE_LINE: {
286 char *line, *sep;
287 size_t n = 0;
288
289 assert(imp->data_size == 0);
290
291 r = get_line(imp, &line, &n);
292 if (r < 0)
293 return r;
294 if (r == 0) {
295 imp->state = IMPORTER_STATE_EOF;
d74dc4f2 296 return 0;
b18453ed
ZJS
297 }
298 assert(n > 0);
299 assert(line[n-1] == '\n');
300
301 if (n == 1) {
302 log_trace("Received empty line, event is ready");
303 return 1;
304 }
305
b18453ed
ZJS
306 /* MESSAGE=xxx\n
307 or
308 COREDUMP\n
309 LLLLLLLL0011223344...\n
310 */
311 sep = memchr(line, '=', n);
312 if (sep) {
313 /* chomp newline */
314 n--;
315
1e448731
ZJS
316 if (!journal_field_valid(line, sep - line, true)) {
317 char buf[64], *t;
318
319 t = strndupa(line, sep - line);
320 log_debug("Ignoring invalid field: \"%s\"",
c0b6ada7 321 cellescape(buf, sizeof buf, t));
1e448731
ZJS
322
323 return 0;
324 }
325
bcac9822 326 line[n] = '\0';
c0b6ada7 327 r = process_special_field(imp, line);
bcac9822
ZJS
328 if (r != 0)
329 return r < 0 ? r : 0;
330
b18453ed
ZJS
331 r = iovw_put(&imp->iovw, line, n);
332 if (r < 0)
333 return r;
334 } else {
8786d4bb
YW
335 if (!journal_field_valid(line, n - 1, true)) {
336 char buf[64], *t;
337
338 t = strndupa(line, n - 1);
339 log_debug("Ignoring invalid field: \"%s\"",
340 cellescape(buf, sizeof buf, t));
341
342 return 0;
343 }
344
b18453ed
ZJS
345 /* replace \n with = */
346 line[n-1] = '=';
347
348 imp->field_len = n;
349 imp->state = IMPORTER_STATE_DATA_START;
350
351 /* we cannot put the field in iovec until we have all data */
352 }
353
354 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
355
356 return 0; /* continue */
357 }
358
359 case IMPORTER_STATE_DATA_START:
360 assert(imp->data_size == 0);
361
362 r = get_data_size(imp);
363 // log_debug("get_data_size() -> %d", r);
364 if (r < 0)
365 return r;
366 if (r == 0) {
367 imp->state = IMPORTER_STATE_EOF;
368 return 0;
369 }
370
371 imp->state = imp->data_size > 0 ?
372 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
373
374 return 0; /* continue */
375
376 case IMPORTER_STATE_DATA: {
377 void *data;
378 char *field;
379
380 assert(imp->data_size > 0);
381
382 r = get_data_data(imp, &data);
383 // log_debug("get_data_data() -> %d", r);
384 if (r < 0)
385 return r;
386 if (r == 0) {
387 imp->state = IMPORTER_STATE_EOF;
388 return 0;
389 }
390
391 assert(data);
392
393 field = (char*) data - sizeof(uint64_t) - imp->field_len;
394 memmove(field + sizeof(uint64_t), field, imp->field_len);
395
396 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
397 if (r < 0)
398 return r;
399
400 imp->state = IMPORTER_STATE_DATA_FINISH;
401
402 return 0; /* continue */
403 }
404
405 case IMPORTER_STATE_DATA_FINISH:
406 r = get_data_newline(imp);
407 // log_debug("get_data_newline() -> %d", r);
408 if (r < 0)
409 return r;
410 if (r == 0) {
411 imp->state = IMPORTER_STATE_EOF;
412 return 0;
413 }
414
415 imp->data_size = 0;
416 imp->state = IMPORTER_STATE_LINE;
417
418 return 0; /* continue */
419 default:
420 assert_not_reached("wtf?");
421 }
422}
423
424int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
425 assert(imp);
426 assert(imp->state != IMPORTER_STATE_EOF);
427
baaa35ad
ZJS
428 if (!realloc_buffer(imp, imp->filled + size))
429 return log_error_errno(SYNTHETIC_ERRNO(ENOMEM),
430 "Failed to store received data of size %zu "
431 "(in addition to existing %zu bytes with %zu filled): %s",
319a4f4b 432 size, MALLOC_SIZEOF_SAFE(imp->buf), imp->filled,
4bbccb02 433 strerror_safe(ENOMEM));
b18453ed
ZJS
434
435 memcpy(imp->buf + imp->filled, data, size);
436 imp->filled += size;
437
438 return 0;
439}
440
441void journal_importer_drop_iovw(JournalImporter *imp) {
442 size_t remain, target;
443
444 /* This function drops processed data that along with the iovw that points at it */
445
11e6d971 446 iovw_free_contents(&imp->iovw, false);
b18453ed
ZJS
447
448 /* possibly reset buffer position */
449 remain = imp->filled - imp->offset;
450
451 if (remain == 0) /* no brainer */
452 imp->offset = imp->scanned = imp->filled = 0;
319a4f4b 453 else if (imp->offset > MALLOC_SIZEOF_SAFE(imp->buf) - imp->filled &&
b18453ed
ZJS
454 imp->offset > remain) {
455 memcpy(imp->buf, imp->buf + imp->offset, remain);
456 imp->offset = imp->scanned = 0;
457 imp->filled = remain;
458 }
459
319a4f4b 460 target = MALLOC_SIZEOF_SAFE(imp->buf);
b18453ed
ZJS
461 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
462 target /= 2;
319a4f4b 463 if (target < MALLOC_SIZEOF_SAFE(imp->buf)) {
b18453ed 464 char *tmp;
319a4f4b
LP
465 size_t old_size;
466
467 old_size = MALLOC_SIZEOF_SAFE(imp->buf);
b18453ed
ZJS
468
469 tmp = realloc(imp->buf, target);
470 if (!tmp)
471 log_warning("Failed to reallocate buffer to (smaller) size %zu",
472 target);
473 else {
474 log_debug("Reallocated buffer from %zu to %zu bytes",
319a4f4b 475 old_size, target);
b18453ed 476 imp->buf = tmp;
b18453ed
ZJS
477 }
478 }
479}
480
481bool journal_importer_eof(const JournalImporter *imp) {
482 return imp->state == IMPORTER_STATE_EOF;
483}