1 /* SPDX-License-Identifier: LGPL-2.1+ */
3 This file is part of systemd.
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7 systemd is free software; you can redistribute it and/or modify it
8 under the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 2.1 of the License, or
10 (at your option) any later version.
12 systemd is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with systemd; If not, see <http://www.gnu.org/licenses/>.
23 #include "alloc-util.h"
26 #include "journal-importer.h"
27 #include "parse-util.h"
28 #include "string-util.h"
29 #include "unaligned.h"
32 IMPORTER_STATE_LINE
= 0, /* waiting to read, or reading line */
33 IMPORTER_STATE_DATA_START
, /* reading binary data header */
34 IMPORTER_STATE_DATA
, /* reading binary data */
35 IMPORTER_STATE_DATA_FINISH
, /* expecting newline */
36 IMPORTER_STATE_EOF
, /* done */
39 static int iovw_put(struct iovec_wrapper
*iovw
, void* data
, size_t len
) {
40 if (!GREEDY_REALLOC(iovw
->iovec
, iovw
->size_bytes
, iovw
->count
+ 1))
43 iovw
->iovec
[iovw
->count
++] = IOVEC_MAKE(data
, len
);
47 static void iovw_free_contents(struct iovec_wrapper
*iovw
) {
48 iovw
->iovec
= mfree(iovw
->iovec
);
49 iovw
->size_bytes
= iovw
->count
= 0;
52 static void iovw_rebase(struct iovec_wrapper
*iovw
, char *old
, char *new) {
55 for (i
= 0; i
< iovw
->count
; i
++)
56 iovw
->iovec
[i
].iov_base
= (char*) iovw
->iovec
[i
].iov_base
- old
+ new;
59 size_t iovw_size(struct iovec_wrapper
*iovw
) {
62 for (i
= 0; i
< iovw
->count
; i
++)
63 n
+= iovw
->iovec
[i
].iov_len
;
68 void journal_importer_cleanup(JournalImporter
*imp
) {
69 if (imp
->fd
>= 0 && !imp
->passive_fd
) {
70 log_debug("Closing %s (fd=%d)", imp
->name
?: "importer", imp
->fd
);
76 iovw_free_contents(&imp
->iovw
);
79 static char* realloc_buffer(JournalImporter
*imp
, size_t size
) {
80 char *b
, *old
= imp
->buf
;
82 b
= GREEDY_REALLOC(imp
->buf
, imp
->size
, size
);
86 iovw_rebase(&imp
->iovw
, old
, imp
->buf
);
91 static int get_line(JournalImporter
*imp
, char **line
, size_t *size
) {
96 assert(imp
->state
== IMPORTER_STATE_LINE
);
97 assert(imp
->offset
<= imp
->filled
);
98 assert(imp
->filled
<= imp
->size
);
99 assert(!imp
->buf
|| imp
->size
> 0);
100 assert(imp
->fd
>= 0);
104 size_t start
= MAX(imp
->scanned
, imp
->offset
);
106 c
= memchr(imp
->buf
+ start
, '\n',
107 imp
->filled
- start
);
112 imp
->scanned
= imp
->filled
;
113 if (imp
->scanned
>= DATA_SIZE_MAX
) {
114 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX
);
119 /* we have to wait for some data to come to us */
122 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
123 we reallocate it, we'll increase the size at least a bit. */
124 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
125 if (imp
->size
- imp
->filled
< LINE_CHUNK
&&
126 !realloc_buffer(imp
, MIN(imp
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
130 assert(imp
->size
- imp
->filled
>= LINE_CHUNK
||
131 imp
->size
== ENTRY_SIZE_MAX
);
134 imp
->buf
+ imp
->filled
,
135 imp
->size
- imp
->filled
);
138 log_error_errno(errno
, "read(%d, ..., %zu): %m",
140 imp
->size
- imp
->filled
);
148 *line
= imp
->buf
+ imp
->offset
;
149 *size
= c
+ 1 - imp
->buf
- imp
->offset
;
150 imp
->offset
+= *size
;
155 static int fill_fixed_size(JournalImporter
*imp
, void **data
, size_t size
) {
158 assert(IN_SET(imp
->state
, IMPORTER_STATE_DATA_START
, IMPORTER_STATE_DATA
, IMPORTER_STATE_DATA_FINISH
));
159 assert(size
<= DATA_SIZE_MAX
);
160 assert(imp
->offset
<= imp
->filled
);
161 assert(imp
->filled
<= imp
->size
);
162 assert(imp
->buf
|| imp
->size
== 0);
163 assert(!imp
->buf
|| imp
->size
> 0);
164 assert(imp
->fd
>= 0);
167 while (imp
->filled
- imp
->offset
< size
) {
171 /* we have to wait for some data to come to us */
174 if (!realloc_buffer(imp
, imp
->offset
+ size
))
177 n
= read(imp
->fd
, imp
->buf
+ imp
->filled
,
178 imp
->size
- imp
->filled
);
181 log_error_errno(errno
, "read(%d, ..., %zu): %m", imp
->fd
,
182 imp
->size
- imp
->filled
);
190 *data
= imp
->buf
+ imp
->offset
;
196 static int get_data_size(JournalImporter
*imp
) {
201 assert(imp
->state
== IMPORTER_STATE_DATA_START
);
202 assert(imp
->data_size
== 0);
204 r
= fill_fixed_size(imp
, &data
, sizeof(uint64_t));
208 imp
->data_size
= unaligned_read_le64(data
);
209 if (imp
->data_size
> DATA_SIZE_MAX
) {
210 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
211 imp
->data_size
, DATA_SIZE_MAX
);
214 if (imp
->data_size
== 0)
215 log_warning("Binary field with zero length");
220 static int get_data_data(JournalImporter
*imp
, void **data
) {
225 assert(imp
->state
== IMPORTER_STATE_DATA
);
227 r
= fill_fixed_size(imp
, data
, imp
->data_size
);
234 static int get_data_newline(JournalImporter
*imp
) {
239 assert(imp
->state
== IMPORTER_STATE_DATA_FINISH
);
241 r
= fill_fixed_size(imp
, (void**) &data
, 1);
247 log_error("expected newline, got '%c'", *data
);
254 static int process_dunder(JournalImporter
*imp
, char *line
, size_t n
) {
255 const char *timestamp
;
260 assert(line
[n
-1] == '\n');
262 /* XXX: is it worth to support timestamps in extended format?
263 * We don't produce them, but who knows... */
265 timestamp
= startswith(line
, "__CURSOR=");
267 /* ignore __CURSOR */
270 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
272 long long unsigned x
;
274 r
= safe_atollu(timestamp
, &x
);
276 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
278 imp
->ts
.realtime
= x
;
279 return r
< 0 ? r
: 1;
282 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
284 long long unsigned x
;
286 r
= safe_atollu(timestamp
, &x
);
288 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
290 imp
->ts
.monotonic
= x
;
291 return r
< 0 ? r
: 1;
294 timestamp
= startswith(line
, "__");
296 log_notice("Unknown dunder line %s", line
);
304 int journal_importer_process_data(JournalImporter
*imp
) {
308 case IMPORTER_STATE_LINE
: {
312 assert(imp
->data_size
== 0);
314 r
= get_line(imp
, &line
, &n
);
318 imp
->state
= IMPORTER_STATE_EOF
;
322 assert(line
[n
-1] == '\n');
325 log_trace("Received empty line, event is ready");
329 r
= process_dunder(imp
, line
, n
);
331 return r
< 0 ? r
: 0;
336 LLLLLLLL0011223344...\n
338 sep
= memchr(line
, '=', n
);
343 r
= iovw_put(&imp
->iovw
, line
, n
);
347 /* replace \n with = */
351 imp
->state
= IMPORTER_STATE_DATA_START
;
353 /* we cannot put the field in iovec until we have all data */
356 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
358 return 0; /* continue */
361 case IMPORTER_STATE_DATA_START
:
362 assert(imp
->data_size
== 0);
364 r
= get_data_size(imp
);
365 // log_debug("get_data_size() -> %d", r);
369 imp
->state
= IMPORTER_STATE_EOF
;
373 imp
->state
= imp
->data_size
> 0 ?
374 IMPORTER_STATE_DATA
: IMPORTER_STATE_DATA_FINISH
;
376 return 0; /* continue */
378 case IMPORTER_STATE_DATA
: {
382 assert(imp
->data_size
> 0);
384 r
= get_data_data(imp
, &data
);
385 // log_debug("get_data_data() -> %d", r);
389 imp
->state
= IMPORTER_STATE_EOF
;
395 field
= (char*) data
- sizeof(uint64_t) - imp
->field_len
;
396 memmove(field
+ sizeof(uint64_t), field
, imp
->field_len
);
398 r
= iovw_put(&imp
->iovw
, field
+ sizeof(uint64_t), imp
->field_len
+ imp
->data_size
);
402 imp
->state
= IMPORTER_STATE_DATA_FINISH
;
404 return 0; /* continue */
407 case IMPORTER_STATE_DATA_FINISH
:
408 r
= get_data_newline(imp
);
409 // log_debug("get_data_newline() -> %d", r);
413 imp
->state
= IMPORTER_STATE_EOF
;
418 imp
->state
= IMPORTER_STATE_LINE
;
420 return 0; /* continue */
422 assert_not_reached("wtf?");
426 int journal_importer_push_data(JournalImporter
*imp
, const char *data
, size_t size
) {
428 assert(imp
->state
!= IMPORTER_STATE_EOF
);
430 if (!realloc_buffer(imp
, imp
->filled
+ size
)) {
431 log_error("Failed to store received data of size %zu "
432 "(in addition to existing %zu bytes with %zu filled): %s",
433 size
, imp
->size
, imp
->filled
, strerror(ENOMEM
));
437 memcpy(imp
->buf
+ imp
->filled
, data
, size
);
443 void journal_importer_drop_iovw(JournalImporter
*imp
) {
444 size_t remain
, target
;
446 /* This function drops processed data that along with the iovw that points at it */
448 iovw_free_contents(&imp
->iovw
);
450 /* possibly reset buffer position */
451 remain
= imp
->filled
- imp
->offset
;
453 if (remain
== 0) /* no brainer */
454 imp
->offset
= imp
->scanned
= imp
->filled
= 0;
455 else if (imp
->offset
> imp
->size
- imp
->filled
&&
456 imp
->offset
> remain
) {
457 memcpy(imp
->buf
, imp
->buf
+ imp
->offset
, remain
);
458 imp
->offset
= imp
->scanned
= 0;
459 imp
->filled
= remain
;
463 while (target
> 16 * LINE_CHUNK
&& imp
->filled
< target
/ 2)
465 if (target
< imp
->size
) {
468 tmp
= realloc(imp
->buf
, target
);
470 log_warning("Failed to reallocate buffer to (smaller) size %zu",
473 log_debug("Reallocated buffer from %zu to %zu bytes",
481 bool journal_importer_eof(const JournalImporter
*imp
) {
482 return imp
->state
== IMPORTER_STATE_EOF
;