1 /* SPDX-License-Identifier: LGPL-2.1-or-later */
7 #include "alloc-util.h"
8 #include "errno-util.h"
12 #include "journal-file.h"
13 #include "journal-importer.h"
14 #include "journal-util.h"
15 #include "parse-util.h"
16 #include "string-util.h"
18 #include "unaligned.h"
21 IMPORTER_STATE_LINE
= 0, /* waiting to read, or reading line */
22 IMPORTER_STATE_DATA_START
, /* reading binary data header */
23 IMPORTER_STATE_DATA
, /* reading binary data */
24 IMPORTER_STATE_DATA_FINISH
, /* expecting newline */
25 IMPORTER_STATE_EOF
, /* done */
28 void journal_importer_cleanup(JournalImporter
*imp
) {
29 if (imp
->fd
>= 0 && !imp
->passive_fd
) {
30 log_debug("Closing %s (fd=%d)", imp
->name
?: "importer", imp
->fd
);
36 iovw_free_contents(&imp
->iovw
, false);
39 static char* realloc_buffer(JournalImporter
*imp
, size_t size
) {
40 char *b
, *old
= ASSERT_PTR(imp
)->buf
;
42 b
= GREEDY_REALLOC(imp
->buf
, size
);
46 iovw_rebase(&imp
->iovw
, old
, imp
->buf
);
51 static int get_line(JournalImporter
*imp
, char **line
, size_t *size
) {
56 assert(imp
->state
== IMPORTER_STATE_LINE
);
57 assert(imp
->offset
<= imp
->filled
);
58 assert(imp
->filled
<= MALLOC_SIZEOF_SAFE(imp
->buf
));
63 size_t start
= MAX(imp
->scanned
, imp
->offset
);
65 c
= memchr(imp
->buf
+ start
, '\n',
71 imp
->scanned
= imp
->filled
;
72 if (imp
->scanned
>= DATA_SIZE_MAX
)
73 return log_warning_errno(SYNTHETIC_ERRNO(ENOBUFS
),
74 "Entry is bigger than %u bytes.",
78 /* we have to wait for some data to come to us */
81 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
82 we reallocate it, we'll increase the size at least a bit. */
83 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
84 if (MALLOC_SIZEOF_SAFE(imp
->buf
) - imp
->filled
< LINE_CHUNK
&&
85 !realloc_buffer(imp
, MIN(imp
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
89 assert(MALLOC_SIZEOF_SAFE(imp
->buf
) - imp
->filled
>= LINE_CHUNK
||
90 MALLOC_SIZEOF_SAFE(imp
->buf
) >= ENTRY_SIZE_MAX
);
93 imp
->buf
+ imp
->filled
,
94 MALLOC_SIZEOF_SAFE(imp
->buf
) - imp
->filled
);
96 if (ERRNO_IS_DISCONNECT(errno
)) {
97 log_debug_errno(errno
, "Got disconnect for importer %s.", strna(imp
->name
));
101 if (!ERRNO_IS_TRANSIENT(errno
))
102 log_error_errno(errno
, "read(%d, ..., %zu): %m",
104 MALLOC_SIZEOF_SAFE(imp
->buf
) - imp
->filled
);
112 *line
= imp
->buf
+ imp
->offset
;
113 *size
= c
+ 1 - imp
->buf
- imp
->offset
;
114 imp
->offset
+= *size
;
119 static int fill_fixed_size(JournalImporter
*imp
, void **data
, size_t size
) {
122 assert(IN_SET(imp
->state
, IMPORTER_STATE_DATA_START
, IMPORTER_STATE_DATA
, IMPORTER_STATE_DATA_FINISH
));
123 assert(size
<= DATA_SIZE_MAX
);
124 assert(imp
->offset
<= imp
->filled
);
125 assert(imp
->filled
<= MALLOC_SIZEOF_SAFE(imp
->buf
));
126 assert(imp
->fd
>= 0);
129 while (imp
->filled
- imp
->offset
< size
) {
133 /* we have to wait for some data to come to us */
136 if (!realloc_buffer(imp
, imp
->offset
+ size
))
139 n
= read(imp
->fd
, imp
->buf
+ imp
->filled
,
140 MALLOC_SIZEOF_SAFE(imp
->buf
) - imp
->filled
);
142 if (ERRNO_IS_DISCONNECT(errno
)) {
143 log_debug_errno(errno
, "Got disconnect for importer %s.", strna(imp
->name
));
147 if (!ERRNO_IS_TRANSIENT(errno
))
148 log_error_errno(errno
, "read(%d, ..., %zu): %m", imp
->fd
,
149 MALLOC_SIZEOF_SAFE(imp
->buf
) - imp
->filled
);
157 *data
= imp
->buf
+ imp
->offset
;
163 static int get_data_size(JournalImporter
*imp
) {
168 assert(imp
->state
== IMPORTER_STATE_DATA_START
);
169 assert(imp
->data_size
== 0);
171 r
= fill_fixed_size(imp
, &data
, sizeof(uint64_t));
175 imp
->data_size
= unaligned_read_le64(data
);
176 if (imp
->data_size
> DATA_SIZE_MAX
)
177 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL
),
178 "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
179 imp
->data_size
, DATA_SIZE_MAX
);
180 if (imp
->data_size
== 0)
181 log_warning("Binary field with zero length");
186 static int get_data_data(JournalImporter
*imp
, void **data
) {
191 assert(imp
->state
== IMPORTER_STATE_DATA
);
193 r
= fill_fixed_size(imp
, data
, imp
->data_size
);
200 static int get_data_newline(JournalImporter
*imp
) {
205 assert(imp
->state
== IMPORTER_STATE_DATA_FINISH
);
207 r
= fill_fixed_size(imp
, (void**) &data
, 1);
216 l
= cescape_char(*data
, buf
);
217 return log_warning_errno(SYNTHETIC_ERRNO(EINVAL
),
218 "Expected newline, got '%.*s'", l
, buf
);
224 static int process_special_field(JournalImporter
*imp
, char *line
) {
226 char buf
[CELLESCAPE_DEFAULT_LENGTH
];
231 if (STARTSWITH_SET(line
, "__CURSOR=", "__SEQNUM=", "__SEQNUM_ID="))
232 /* ignore __CURSOR=, __SEQNUM=, __SEQNUM_ID= which we cannot replicate */
235 value
= startswith(line
, "__REALTIME_TIMESTAMP=");
239 r
= safe_atou64(value
, &x
);
241 return log_warning_errno(r
, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
242 cellescape(buf
, sizeof buf
, value
));
243 else if (!VALID_REALTIME(x
)) {
244 log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64
, x
);
248 imp
->ts
.realtime
= x
;
252 value
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
256 r
= safe_atou64(value
, &x
);
258 return log_warning_errno(r
, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
259 cellescape(buf
, sizeof buf
, value
));
260 else if (!VALID_MONOTONIC(x
)) {
261 log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64
, x
);
265 imp
->ts
.monotonic
= x
;
269 /* Just a single underline, but it needs special treatment too. */
270 value
= startswith(line
, "_BOOT_ID=");
272 r
= sd_id128_from_string(value
, &imp
->boot_id
);
274 return log_warning_errno(r
, "Failed to parse _BOOT_ID '%s': %m",
275 cellescape(buf
, sizeof buf
, value
));
277 /* store the field in the usual fashion too */
281 value
= startswith(line
, "__");
283 log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf
, sizeof buf
, value
));
291 int journal_importer_process_data(JournalImporter
*imp
) {
294 switch (imp
->state
) {
295 case IMPORTER_STATE_LINE
: {
299 assert(imp
->data_size
== 0);
301 r
= get_line(imp
, &line
, &n
);
305 imp
->state
= IMPORTER_STATE_EOF
;
309 assert(line
[n
-1] == '\n');
312 log_trace("Received empty line, event is ready");
319 LLLLLLLL0011223344...\n
321 sep
= memchr(line
, '=', n
);
326 if (!journal_field_valid(line
, sep
- line
, true)) {
329 t
= strndupa_safe(line
, sep
- line
);
330 log_debug("Ignoring invalid field: \"%s\"",
331 cellescape(buf
, sizeof buf
, t
));
337 r
= process_special_field(imp
, line
);
339 return r
< 0 ? r
: 0;
341 r
= iovw_put(&imp
->iovw
, line
, n
);
345 if (!journal_field_valid(line
, n
- 1, true)) {
348 t
= strndupa_safe(line
, n
- 1);
349 log_debug("Ignoring invalid field: \"%s\"",
350 cellescape(buf
, sizeof buf
, t
));
355 /* replace \n with = */
359 imp
->state
= IMPORTER_STATE_DATA_START
;
361 /* we cannot put the field in iovec until we have all data */
364 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
366 return 0; /* continue */
369 case IMPORTER_STATE_DATA_START
:
370 assert(imp
->data_size
== 0);
372 r
= get_data_size(imp
);
373 // log_debug("get_data_size() -> %d", r);
377 imp
->state
= IMPORTER_STATE_EOF
;
381 imp
->state
= imp
->data_size
> 0 ?
382 IMPORTER_STATE_DATA
: IMPORTER_STATE_DATA_FINISH
;
384 return 0; /* continue */
386 case IMPORTER_STATE_DATA
: {
390 assert(imp
->data_size
> 0);
392 r
= get_data_data(imp
, &data
);
393 // log_debug("get_data_data() -> %d", r);
397 imp
->state
= IMPORTER_STATE_EOF
;
403 field
= (char*) data
- sizeof(uint64_t) - imp
->field_len
;
404 memmove(field
+ sizeof(uint64_t), field
, imp
->field_len
);
406 r
= iovw_put(&imp
->iovw
, field
+ sizeof(uint64_t), imp
->field_len
+ imp
->data_size
);
410 imp
->state
= IMPORTER_STATE_DATA_FINISH
;
412 return 0; /* continue */
415 case IMPORTER_STATE_DATA_FINISH
:
416 r
= get_data_newline(imp
);
417 // log_debug("get_data_newline() -> %d", r);
421 imp
->state
= IMPORTER_STATE_EOF
;
426 imp
->state
= IMPORTER_STATE_LINE
;
428 return 0; /* continue */
430 assert_not_reached();
434 int journal_importer_push_data(JournalImporter
*imp
, const char *data
, size_t size
) {
436 assert(imp
->state
!= IMPORTER_STATE_EOF
);
438 if (!realloc_buffer(imp
, imp
->filled
+ size
))
439 return log_error_errno(ENOMEM
,
440 "Failed to store received data of size %zu "
441 "(in addition to existing %zu bytes with %zu filled): %m",
442 size
, MALLOC_SIZEOF_SAFE(imp
->buf
), imp
->filled
);
444 memcpy(imp
->buf
+ imp
->filled
, data
, size
);
450 void journal_importer_drop_iovw(JournalImporter
*imp
) {
451 size_t remain
, target
;
453 /* This function drops processed data that along with the iovw that points at it */
455 iovw_free_contents(&imp
->iovw
, false);
457 /* possibly reset buffer position */
458 remain
= imp
->filled
- imp
->offset
;
460 if (remain
== 0) /* no brainer */
461 imp
->offset
= imp
->scanned
= imp
->filled
= 0;
462 else if (imp
->offset
> MALLOC_SIZEOF_SAFE(imp
->buf
) - imp
->filled
&&
463 imp
->offset
> remain
) {
464 memcpy(imp
->buf
, imp
->buf
+ imp
->offset
, remain
);
465 imp
->offset
= imp
->scanned
= 0;
466 imp
->filled
= remain
;
469 target
= MALLOC_SIZEOF_SAFE(imp
->buf
);
470 while (target
> 16 * LINE_CHUNK
&& imp
->filled
< target
/ 2)
472 if (target
< MALLOC_SIZEOF_SAFE(imp
->buf
)) {
476 old_size
= MALLOC_SIZEOF_SAFE(imp
->buf
);
478 tmp
= realloc(imp
->buf
, target
);
480 log_warning("Failed to reallocate buffer to (smaller) size %zu",
483 log_debug("Reallocated buffer from %zu to %zu bytes",
490 bool journal_importer_eof(const JournalImporter
*imp
) {
491 return imp
->state
== IMPORTER_STATE_EOF
;