1 /* SPDX-License-Identifier: LGPL-2.1+ */
6 #include "alloc-util.h"
10 #include "journal-file.h"
11 #include "journal-importer.h"
12 #include "journal-util.h"
13 #include "parse-util.h"
14 #include "string-util.h"
15 #include "unaligned.h"
18 IMPORTER_STATE_LINE
= 0, /* waiting to read, or reading line */
19 IMPORTER_STATE_DATA_START
, /* reading binary data header */
20 IMPORTER_STATE_DATA
, /* reading binary data */
21 IMPORTER_STATE_DATA_FINISH
, /* expecting newline */
22 IMPORTER_STATE_EOF
, /* done */
25 static int iovw_put(struct iovec_wrapper
*iovw
, void* data
, size_t len
) {
26 if (iovw
->count
>= ENTRY_FIELD_COUNT_MAX
)
29 if (!GREEDY_REALLOC(iovw
->iovec
, iovw
->size_bytes
, iovw
->count
+ 1))
32 iovw
->iovec
[iovw
->count
++] = IOVEC_MAKE(data
, len
);
36 static void iovw_free_contents(struct iovec_wrapper
*iovw
) {
37 iovw
->iovec
= mfree(iovw
->iovec
);
38 iovw
->size_bytes
= iovw
->count
= 0;
41 static void iovw_rebase(struct iovec_wrapper
*iovw
, char *old
, char *new) {
44 for (i
= 0; i
< iovw
->count
; i
++)
45 iovw
->iovec
[i
].iov_base
= (char*) iovw
->iovec
[i
].iov_base
- old
+ new;
48 size_t iovw_size(struct iovec_wrapper
*iovw
) {
51 for (i
= 0; i
< iovw
->count
; i
++)
52 n
+= iovw
->iovec
[i
].iov_len
;
57 void journal_importer_cleanup(JournalImporter
*imp
) {
58 if (imp
->fd
>= 0 && !imp
->passive_fd
) {
59 log_debug("Closing %s (fd=%d)", imp
->name
?: "importer", imp
->fd
);
65 iovw_free_contents(&imp
->iovw
);
68 static char* realloc_buffer(JournalImporter
*imp
, size_t size
) {
69 char *b
, *old
= imp
->buf
;
71 b
= GREEDY_REALLOC(imp
->buf
, imp
->size
, size
);
75 iovw_rebase(&imp
->iovw
, old
, imp
->buf
);
80 static int get_line(JournalImporter
*imp
, char **line
, size_t *size
) {
85 assert(imp
->state
== IMPORTER_STATE_LINE
);
86 assert(imp
->offset
<= imp
->filled
);
87 assert(imp
->filled
<= imp
->size
);
88 assert(!imp
->buf
|| imp
->size
> 0);
93 size_t start
= MAX(imp
->scanned
, imp
->offset
);
95 c
= memchr(imp
->buf
+ start
, '\n',
101 imp
->scanned
= imp
->filled
;
102 if (imp
->scanned
>= DATA_SIZE_MAX
)
103 return log_error_errno(SYNTHETIC_ERRNO(ENOBUFS
),
104 "Entry is bigger than %u bytes.",
108 /* we have to wait for some data to come to us */
111 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
112 we reallocate it, we'll increase the size at least a bit. */
113 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
114 if (imp
->size
- imp
->filled
< LINE_CHUNK
&&
115 !realloc_buffer(imp
, MIN(imp
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
119 assert(imp
->size
- imp
->filled
>= LINE_CHUNK
||
120 imp
->size
== ENTRY_SIZE_MAX
);
123 imp
->buf
+ imp
->filled
,
124 imp
->size
- imp
->filled
);
127 log_error_errno(errno
, "read(%d, ..., %zu): %m",
129 imp
->size
- imp
->filled
);
137 *line
= imp
->buf
+ imp
->offset
;
138 *size
= c
+ 1 - imp
->buf
- imp
->offset
;
139 imp
->offset
+= *size
;
144 static int fill_fixed_size(JournalImporter
*imp
, void **data
, size_t size
) {
147 assert(IN_SET(imp
->state
, IMPORTER_STATE_DATA_START
, IMPORTER_STATE_DATA
, IMPORTER_STATE_DATA_FINISH
));
148 assert(size
<= DATA_SIZE_MAX
);
149 assert(imp
->offset
<= imp
->filled
);
150 assert(imp
->filled
<= imp
->size
);
151 assert(imp
->buf
|| imp
->size
== 0);
152 assert(!imp
->buf
|| imp
->size
> 0);
153 assert(imp
->fd
>= 0);
156 while (imp
->filled
- imp
->offset
< size
) {
160 /* we have to wait for some data to come to us */
163 if (!realloc_buffer(imp
, imp
->offset
+ size
))
166 n
= read(imp
->fd
, imp
->buf
+ imp
->filled
,
167 imp
->size
- imp
->filled
);
170 log_error_errno(errno
, "read(%d, ..., %zu): %m", imp
->fd
,
171 imp
->size
- imp
->filled
);
179 *data
= imp
->buf
+ imp
->offset
;
185 static int get_data_size(JournalImporter
*imp
) {
190 assert(imp
->state
== IMPORTER_STATE_DATA_START
);
191 assert(imp
->data_size
== 0);
193 r
= fill_fixed_size(imp
, &data
, sizeof(uint64_t));
197 imp
->data_size
= unaligned_read_le64(data
);
198 if (imp
->data_size
> DATA_SIZE_MAX
)
199 return log_error_errno(SYNTHETIC_ERRNO(EINVAL
),
200 "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
201 imp
->data_size
, DATA_SIZE_MAX
);
202 if (imp
->data_size
== 0)
203 log_warning("Binary field with zero length");
208 static int get_data_data(JournalImporter
*imp
, void **data
) {
213 assert(imp
->state
== IMPORTER_STATE_DATA
);
215 r
= fill_fixed_size(imp
, data
, imp
->data_size
);
222 static int get_data_newline(JournalImporter
*imp
) {
227 assert(imp
->state
== IMPORTER_STATE_DATA_FINISH
);
229 r
= fill_fixed_size(imp
, (void**) &data
, 1);
238 l
= cescape_char(*data
, buf
);
239 return log_error_errno(SYNTHETIC_ERRNO(EINVAL
),
240 "Expected newline, got '%.*s'", l
, buf
);
246 static int process_special_field(JournalImporter
*imp
, char *line
) {
248 char buf
[CELLESCAPE_DEFAULT_LENGTH
];
253 value
= startswith(line
, "__CURSOR=");
255 /* ignore __CURSOR */
258 value
= startswith(line
, "__REALTIME_TIMESTAMP=");
262 r
= safe_atou64(value
, &x
);
264 return log_warning_errno(r
, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
265 cellescape(buf
, sizeof buf
, value
));
266 else if (!VALID_REALTIME(x
)) {
267 log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64
, x
);
271 imp
->ts
.realtime
= x
;
275 value
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
279 r
= safe_atou64(value
, &x
);
281 return log_warning_errno(r
, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
282 cellescape(buf
, sizeof buf
, value
));
283 else if (!VALID_MONOTONIC(x
)) {
284 log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64
, x
);
288 imp
->ts
.monotonic
= x
;
292 /* Just a single underline, but it needs special treatment too. */
293 value
= startswith(line
, "_BOOT_ID=");
295 r
= sd_id128_from_string(value
, &imp
->boot_id
);
297 return log_warning_errno(r
, "Failed to parse _BOOT_ID '%s': %m",
298 cellescape(buf
, sizeof buf
, value
));
300 /* store the field in the usual fashion too */
304 value
= startswith(line
, "__");
306 log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf
, sizeof buf
, value
));
314 int journal_importer_process_data(JournalImporter
*imp
) {
318 case IMPORTER_STATE_LINE
: {
322 assert(imp
->data_size
== 0);
324 r
= get_line(imp
, &line
, &n
);
328 imp
->state
= IMPORTER_STATE_EOF
;
332 assert(line
[n
-1] == '\n');
335 log_trace("Received empty line, event is ready");
342 LLLLLLLL0011223344...\n
344 sep
= memchr(line
, '=', n
);
349 if (!journal_field_valid(line
, sep
- line
, true)) {
352 t
= strndupa(line
, sep
- line
);
353 log_debug("Ignoring invalid field: \"%s\"",
354 cellescape(buf
, sizeof buf
, t
));
360 r
= process_special_field(imp
, line
);
362 return r
< 0 ? r
: 0;
364 r
= iovw_put(&imp
->iovw
, line
, n
);
368 /* replace \n with = */
372 imp
->state
= IMPORTER_STATE_DATA_START
;
374 /* we cannot put the field in iovec until we have all data */
377 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
379 return 0; /* continue */
382 case IMPORTER_STATE_DATA_START
:
383 assert(imp
->data_size
== 0);
385 r
= get_data_size(imp
);
386 // log_debug("get_data_size() -> %d", r);
390 imp
->state
= IMPORTER_STATE_EOF
;
394 imp
->state
= imp
->data_size
> 0 ?
395 IMPORTER_STATE_DATA
: IMPORTER_STATE_DATA_FINISH
;
397 return 0; /* continue */
399 case IMPORTER_STATE_DATA
: {
403 assert(imp
->data_size
> 0);
405 r
= get_data_data(imp
, &data
);
406 // log_debug("get_data_data() -> %d", r);
410 imp
->state
= IMPORTER_STATE_EOF
;
416 field
= (char*) data
- sizeof(uint64_t) - imp
->field_len
;
417 memmove(field
+ sizeof(uint64_t), field
, imp
->field_len
);
419 r
= iovw_put(&imp
->iovw
, field
+ sizeof(uint64_t), imp
->field_len
+ imp
->data_size
);
423 imp
->state
= IMPORTER_STATE_DATA_FINISH
;
425 return 0; /* continue */
428 case IMPORTER_STATE_DATA_FINISH
:
429 r
= get_data_newline(imp
);
430 // log_debug("get_data_newline() -> %d", r);
434 imp
->state
= IMPORTER_STATE_EOF
;
439 imp
->state
= IMPORTER_STATE_LINE
;
441 return 0; /* continue */
443 assert_not_reached("wtf?");
447 int journal_importer_push_data(JournalImporter
*imp
, const char *data
, size_t size
) {
449 assert(imp
->state
!= IMPORTER_STATE_EOF
);
451 if (!realloc_buffer(imp
, imp
->filled
+ size
))
452 return log_error_errno(SYNTHETIC_ERRNO(ENOMEM
),
453 "Failed to store received data of size %zu "
454 "(in addition to existing %zu bytes with %zu filled): %s",
455 size
, imp
->size
, imp
->filled
,
458 memcpy(imp
->buf
+ imp
->filled
, data
, size
);
464 void journal_importer_drop_iovw(JournalImporter
*imp
) {
465 size_t remain
, target
;
467 /* This function drops processed data that along with the iovw that points at it */
469 iovw_free_contents(&imp
->iovw
);
471 /* possibly reset buffer position */
472 remain
= imp
->filled
- imp
->offset
;
474 if (remain
== 0) /* no brainer */
475 imp
->offset
= imp
->scanned
= imp
->filled
= 0;
476 else if (imp
->offset
> imp
->size
- imp
->filled
&&
477 imp
->offset
> remain
) {
478 memcpy(imp
->buf
, imp
->buf
+ imp
->offset
, remain
);
479 imp
->offset
= imp
->scanned
= 0;
480 imp
->filled
= remain
;
484 while (target
> 16 * LINE_CHUNK
&& imp
->filled
< target
/ 2)
486 if (target
< imp
->size
) {
489 tmp
= realloc(imp
->buf
, target
);
491 log_warning("Failed to reallocate buffer to (smaller) size %zu",
494 log_debug("Reallocated buffer from %zu to %zu bytes",
502 bool journal_importer_eof(const JournalImporter
*imp
) {
503 return imp
->state
== IMPORTER_STATE_EOF
;