1 /* SPDX-License-Identifier: LGPL-2.1+ */
3 This file is part of systemd.
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7 systemd is free software; you can redistribute it and/or modify it
8 under the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 2.1 of the License, or
10 (at your option) any later version.
12 systemd is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
17 You should have received a copy of the GNU Lesser General Public License
18 along with systemd; If not, see <http://www.gnu.org/licenses/>.
24 #include "alloc-util.h"
27 #include "journal-importer.h"
28 #include "parse-util.h"
29 #include "string-util.h"
30 #include "unaligned.h"
33 IMPORTER_STATE_LINE
= 0, /* waiting to read, or reading line */
34 IMPORTER_STATE_DATA_START
, /* reading binary data header */
35 IMPORTER_STATE_DATA
, /* reading binary data */
36 IMPORTER_STATE_DATA_FINISH
, /* expecting newline */
37 IMPORTER_STATE_EOF
, /* done */
40 static int iovw_put(struct iovec_wrapper
*iovw
, void* data
, size_t len
) {
41 if (!GREEDY_REALLOC(iovw
->iovec
, iovw
->size_bytes
, iovw
->count
+ 1))
44 iovw
->iovec
[iovw
->count
++] = IOVEC_MAKE(data
, len
);
48 static void iovw_free_contents(struct iovec_wrapper
*iovw
) {
49 iovw
->iovec
= mfree(iovw
->iovec
);
50 iovw
->size_bytes
= iovw
->count
= 0;
53 static void iovw_rebase(struct iovec_wrapper
*iovw
, char *old
, char *new) {
56 for (i
= 0; i
< iovw
->count
; i
++)
57 iovw
->iovec
[i
].iov_base
= (char*) iovw
->iovec
[i
].iov_base
- old
+ new;
60 size_t iovw_size(struct iovec_wrapper
*iovw
) {
63 for (i
= 0; i
< iovw
->count
; i
++)
64 n
+= iovw
->iovec
[i
].iov_len
;
69 void journal_importer_cleanup(JournalImporter
*imp
) {
70 if (imp
->fd
>= 0 && !imp
->passive_fd
) {
71 log_debug("Closing %s (fd=%d)", imp
->name
?: "importer", imp
->fd
);
77 iovw_free_contents(&imp
->iovw
);
80 static char* realloc_buffer(JournalImporter
*imp
, size_t size
) {
81 char *b
, *old
= imp
->buf
;
83 b
= GREEDY_REALLOC(imp
->buf
, imp
->size
, size
);
87 iovw_rebase(&imp
->iovw
, old
, imp
->buf
);
92 static int get_line(JournalImporter
*imp
, char **line
, size_t *size
) {
97 assert(imp
->state
== IMPORTER_STATE_LINE
);
98 assert(imp
->offset
<= imp
->filled
);
99 assert(imp
->filled
<= imp
->size
);
100 assert(!imp
->buf
|| imp
->size
> 0);
101 assert(imp
->fd
>= 0);
105 size_t start
= MAX(imp
->scanned
, imp
->offset
);
107 c
= memchr(imp
->buf
+ start
, '\n',
108 imp
->filled
- start
);
113 imp
->scanned
= imp
->filled
;
114 if (imp
->scanned
>= DATA_SIZE_MAX
) {
115 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX
);
120 /* we have to wait for some data to come to us */
123 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
124 we reallocate it, we'll increase the size at least a bit. */
125 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
126 if (imp
->size
- imp
->filled
< LINE_CHUNK
&&
127 !realloc_buffer(imp
, MIN(imp
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
131 assert(imp
->size
- imp
->filled
>= LINE_CHUNK
||
132 imp
->size
== ENTRY_SIZE_MAX
);
135 imp
->buf
+ imp
->filled
,
136 imp
->size
- imp
->filled
);
139 log_error_errno(errno
, "read(%d, ..., %zu): %m",
141 imp
->size
- imp
->filled
);
149 *line
= imp
->buf
+ imp
->offset
;
150 *size
= c
+ 1 - imp
->buf
- imp
->offset
;
151 imp
->offset
+= *size
;
156 static int fill_fixed_size(JournalImporter
*imp
, void **data
, size_t size
) {
159 assert(IN_SET(imp
->state
, IMPORTER_STATE_DATA_START
, IMPORTER_STATE_DATA
, IMPORTER_STATE_DATA_FINISH
));
160 assert(size
<= DATA_SIZE_MAX
);
161 assert(imp
->offset
<= imp
->filled
);
162 assert(imp
->filled
<= imp
->size
);
163 assert(imp
->buf
|| imp
->size
== 0);
164 assert(!imp
->buf
|| imp
->size
> 0);
165 assert(imp
->fd
>= 0);
168 while (imp
->filled
- imp
->offset
< size
) {
172 /* we have to wait for some data to come to us */
175 if (!realloc_buffer(imp
, imp
->offset
+ size
))
178 n
= read(imp
->fd
, imp
->buf
+ imp
->filled
,
179 imp
->size
- imp
->filled
);
182 log_error_errno(errno
, "read(%d, ..., %zu): %m", imp
->fd
,
183 imp
->size
- imp
->filled
);
191 *data
= imp
->buf
+ imp
->offset
;
197 static int get_data_size(JournalImporter
*imp
) {
202 assert(imp
->state
== IMPORTER_STATE_DATA_START
);
203 assert(imp
->data_size
== 0);
205 r
= fill_fixed_size(imp
, &data
, sizeof(uint64_t));
209 imp
->data_size
= unaligned_read_le64(data
);
210 if (imp
->data_size
> DATA_SIZE_MAX
) {
211 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
212 imp
->data_size
, DATA_SIZE_MAX
);
215 if (imp
->data_size
== 0)
216 log_warning("Binary field with zero length");
221 static int get_data_data(JournalImporter
*imp
, void **data
) {
226 assert(imp
->state
== IMPORTER_STATE_DATA
);
228 r
= fill_fixed_size(imp
, data
, imp
->data_size
);
235 static int get_data_newline(JournalImporter
*imp
) {
240 assert(imp
->state
== IMPORTER_STATE_DATA_FINISH
);
242 r
= fill_fixed_size(imp
, (void**) &data
, 1);
248 log_error("expected newline, got '%c'", *data
);
255 static int process_dunder(JournalImporter
*imp
, char *line
, size_t n
) {
256 const char *timestamp
;
261 assert(line
[n
-1] == '\n');
263 /* XXX: is it worth to support timestamps in extended format?
264 * We don't produce them, but who knows... */
266 timestamp
= startswith(line
, "__CURSOR=");
268 /* ignore __CURSOR */
271 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
273 long long unsigned x
;
275 r
= safe_atollu(timestamp
, &x
);
277 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
279 imp
->ts
.realtime
= x
;
280 return r
< 0 ? r
: 1;
283 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
285 long long unsigned x
;
287 r
= safe_atollu(timestamp
, &x
);
289 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
291 imp
->ts
.monotonic
= x
;
292 return r
< 0 ? r
: 1;
295 timestamp
= startswith(line
, "__");
297 log_notice("Unknown dunder line %s", line
);
305 int journal_importer_process_data(JournalImporter
*imp
) {
309 case IMPORTER_STATE_LINE
: {
313 assert(imp
->data_size
== 0);
315 r
= get_line(imp
, &line
, &n
);
319 imp
->state
= IMPORTER_STATE_EOF
;
323 assert(line
[n
-1] == '\n');
326 log_trace("Received empty line, event is ready");
330 r
= process_dunder(imp
, line
, n
);
332 return r
< 0 ? r
: 0;
337 LLLLLLLL0011223344...\n
339 sep
= memchr(line
, '=', n
);
344 r
= iovw_put(&imp
->iovw
, line
, n
);
348 /* replace \n with = */
352 imp
->state
= IMPORTER_STATE_DATA_START
;
354 /* we cannot put the field in iovec until we have all data */
357 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
359 return 0; /* continue */
362 case IMPORTER_STATE_DATA_START
:
363 assert(imp
->data_size
== 0);
365 r
= get_data_size(imp
);
366 // log_debug("get_data_size() -> %d", r);
370 imp
->state
= IMPORTER_STATE_EOF
;
374 imp
->state
= imp
->data_size
> 0 ?
375 IMPORTER_STATE_DATA
: IMPORTER_STATE_DATA_FINISH
;
377 return 0; /* continue */
379 case IMPORTER_STATE_DATA
: {
383 assert(imp
->data_size
> 0);
385 r
= get_data_data(imp
, &data
);
386 // log_debug("get_data_data() -> %d", r);
390 imp
->state
= IMPORTER_STATE_EOF
;
396 field
= (char*) data
- sizeof(uint64_t) - imp
->field_len
;
397 memmove(field
+ sizeof(uint64_t), field
, imp
->field_len
);
399 r
= iovw_put(&imp
->iovw
, field
+ sizeof(uint64_t), imp
->field_len
+ imp
->data_size
);
403 imp
->state
= IMPORTER_STATE_DATA_FINISH
;
405 return 0; /* continue */
408 case IMPORTER_STATE_DATA_FINISH
:
409 r
= get_data_newline(imp
);
410 // log_debug("get_data_newline() -> %d", r);
414 imp
->state
= IMPORTER_STATE_EOF
;
419 imp
->state
= IMPORTER_STATE_LINE
;
421 return 0; /* continue */
423 assert_not_reached("wtf?");
427 int journal_importer_push_data(JournalImporter
*imp
, const char *data
, size_t size
) {
429 assert(imp
->state
!= IMPORTER_STATE_EOF
);
431 if (!realloc_buffer(imp
, imp
->filled
+ size
)) {
432 log_error("Failed to store received data of size %zu "
433 "(in addition to existing %zu bytes with %zu filled): %s",
434 size
, imp
->size
, imp
->filled
, strerror(ENOMEM
));
438 memcpy(imp
->buf
+ imp
->filled
, data
, size
);
444 void journal_importer_drop_iovw(JournalImporter
*imp
) {
445 size_t remain
, target
;
447 /* This function drops processed data that along with the iovw that points at it */
449 iovw_free_contents(&imp
->iovw
);
451 /* possibly reset buffer position */
452 remain
= imp
->filled
- imp
->offset
;
454 if (remain
== 0) /* no brainer */
455 imp
->offset
= imp
->scanned
= imp
->filled
= 0;
456 else if (imp
->offset
> imp
->size
- imp
->filled
&&
457 imp
->offset
> remain
) {
458 memcpy(imp
->buf
, imp
->buf
+ imp
->offset
, remain
);
459 imp
->offset
= imp
->scanned
= 0;
460 imp
->filled
= remain
;
464 while (target
> 16 * LINE_CHUNK
&& imp
->filled
< target
/ 2)
466 if (target
< imp
->size
) {
469 tmp
= realloc(imp
->buf
, target
);
471 log_warning("Failed to reallocate buffer to (smaller) size %zu",
474 log_debug("Reallocated buffer from %zu to %zu bytes",
482 bool journal_importer_eof(const JournalImporter
*imp
) {
483 return imp
->state
== IMPORTER_STATE_EOF
;