2 This file is part of systemd.
4 Copyright 2014 Zbigniew Jędrzejewski-Szmek
6 systemd is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 systemd is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License
17 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "alloc-util.h"
23 #include "journal-importer.h"
25 #include "parse-util.h"
26 #include "string-util.h"
27 #include "unaligned.h"
30 IMPORTER_STATE_LINE
= 0, /* waiting to read, or reading line */
31 IMPORTER_STATE_DATA_START
, /* reading binary data header */
32 IMPORTER_STATE_DATA
, /* reading binary data */
33 IMPORTER_STATE_DATA_FINISH
, /* expecting newline */
34 IMPORTER_STATE_EOF
, /* done */
37 static int iovw_put(struct iovec_wrapper
*iovw
, void* data
, size_t len
) {
38 if (!GREEDY_REALLOC(iovw
->iovec
, iovw
->size_bytes
, iovw
->count
+ 1))
41 iovw
->iovec
[iovw
->count
++] = (struct iovec
) {data
, len
};
45 static void iovw_free_contents(struct iovec_wrapper
*iovw
) {
46 iovw
->iovec
= mfree(iovw
->iovec
);
47 iovw
->size_bytes
= iovw
->count
= 0;
50 static void iovw_rebase(struct iovec_wrapper
*iovw
, char *old
, char *new) {
53 for (i
= 0; i
< iovw
->count
; i
++)
54 iovw
->iovec
[i
].iov_base
= (char*) iovw
->iovec
[i
].iov_base
- old
+ new;
57 size_t iovw_size(struct iovec_wrapper
*iovw
) {
60 for (i
= 0; i
< iovw
->count
; i
++)
61 n
+= iovw
->iovec
[i
].iov_len
;
66 void journal_importer_cleanup(JournalImporter
*imp
) {
67 if (imp
->fd
>= 0 && !imp
->passive_fd
) {
68 log_debug("Closing %s (fd=%d)", imp
->name
?: "importer", imp
->fd
);
74 iovw_free_contents(&imp
->iovw
);
77 static char* realloc_buffer(JournalImporter
*imp
, size_t size
) {
78 char *b
, *old
= imp
->buf
;
80 b
= GREEDY_REALLOC(imp
->buf
, imp
->size
, size
);
84 iovw_rebase(&imp
->iovw
, old
, imp
->buf
);
89 static int get_line(JournalImporter
*imp
, char **line
, size_t *size
) {
94 assert(imp
->state
== IMPORTER_STATE_LINE
);
95 assert(imp
->offset
<= imp
->filled
);
96 assert(imp
->filled
<= imp
->size
);
97 assert(imp
->buf
== NULL
|| imp
->size
> 0);
102 size_t start
= MAX(imp
->scanned
, imp
->offset
);
104 c
= memchr(imp
->buf
+ start
, '\n',
105 imp
->filled
- start
);
110 imp
->scanned
= imp
->filled
;
111 if (imp
->scanned
>= DATA_SIZE_MAX
) {
112 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX
);
117 /* we have to wait for some data to come to us */
120 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
121 we reallocate it, we'll increase the size at least a bit. */
122 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
123 if (imp
->size
- imp
->filled
< LINE_CHUNK
&&
124 !realloc_buffer(imp
, MIN(imp
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
128 assert(imp
->size
- imp
->filled
>= LINE_CHUNK
||
129 imp
->size
== ENTRY_SIZE_MAX
);
132 imp
->buf
+ imp
->filled
,
133 imp
->size
- imp
->filled
);
136 log_error_errno(errno
, "read(%d, ..., %zu): %m",
138 imp
->size
- imp
->filled
);
146 *line
= imp
->buf
+ imp
->offset
;
147 *size
= c
+ 1 - imp
->buf
- imp
->offset
;
148 imp
->offset
+= *size
;
153 static int fill_fixed_size(JournalImporter
*imp
, void **data
, size_t size
) {
156 assert(imp
->state
== IMPORTER_STATE_DATA_START
||
157 imp
->state
== IMPORTER_STATE_DATA
||
158 imp
->state
== IMPORTER_STATE_DATA_FINISH
);
159 assert(size
<= DATA_SIZE_MAX
);
160 assert(imp
->offset
<= imp
->filled
);
161 assert(imp
->filled
<= imp
->size
);
162 assert(imp
->buf
!= NULL
|| imp
->size
== 0);
163 assert(imp
->buf
== NULL
|| imp
->size
> 0);
164 assert(imp
->fd
>= 0);
167 while (imp
->filled
- imp
->offset
< size
) {
171 /* we have to wait for some data to come to us */
174 if (!realloc_buffer(imp
, imp
->offset
+ size
))
177 n
= read(imp
->fd
, imp
->buf
+ imp
->filled
,
178 imp
->size
- imp
->filled
);
181 log_error_errno(errno
, "read(%d, ..., %zu): %m", imp
->fd
,
182 imp
->size
- imp
->filled
);
190 *data
= imp
->buf
+ imp
->offset
;
196 static int get_data_size(JournalImporter
*imp
) {
201 assert(imp
->state
== IMPORTER_STATE_DATA_START
);
202 assert(imp
->data_size
== 0);
204 r
= fill_fixed_size(imp
, &data
, sizeof(uint64_t));
208 imp
->data_size
= unaligned_read_le64(data
);
209 if (imp
->data_size
> DATA_SIZE_MAX
) {
210 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
211 imp
->data_size
, DATA_SIZE_MAX
);
214 if (imp
->data_size
== 0)
215 log_warning("Binary field with zero length");
220 static int get_data_data(JournalImporter
*imp
, void **data
) {
225 assert(imp
->state
== IMPORTER_STATE_DATA
);
227 r
= fill_fixed_size(imp
, data
, imp
->data_size
);
234 static int get_data_newline(JournalImporter
*imp
) {
239 assert(imp
->state
== IMPORTER_STATE_DATA_FINISH
);
241 r
= fill_fixed_size(imp
, (void**) &data
, 1);
247 log_error("expected newline, got '%c'", *data
);
254 static int process_dunder(JournalImporter
*imp
, char *line
, size_t n
) {
255 const char *timestamp
;
260 assert(line
[n
-1] == '\n');
262 /* XXX: is it worth to support timestamps in extended format?
263 * We don't produce them, but who knows... */
265 timestamp
= startswith(line
, "__CURSOR=");
267 /* ignore __CURSOR */
270 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
272 long long unsigned x
;
274 r
= safe_atollu(timestamp
, &x
);
276 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
278 imp
->ts
.realtime
= x
;
279 return r
< 0 ? r
: 1;
282 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
284 long long unsigned x
;
286 r
= safe_atollu(timestamp
, &x
);
288 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
290 imp
->ts
.monotonic
= x
;
291 return r
< 0 ? r
: 1;
294 timestamp
= startswith(line
, "__");
296 log_notice("Unknown dunder line %s", line
);
304 int journal_importer_process_data(JournalImporter
*imp
) {
308 case IMPORTER_STATE_LINE
: {
312 assert(imp
->data_size
== 0);
314 r
= get_line(imp
, &line
, &n
);
318 imp
->state
= IMPORTER_STATE_EOF
;
322 assert(line
[n
-1] == '\n');
325 log_trace("Received empty line, event is ready");
329 r
= process_dunder(imp
, line
, n
);
331 return r
< 0 ? r
: 0;
336 LLLLLLLL0011223344...\n
338 sep
= memchr(line
, '=', n
);
343 r
= iovw_put(&imp
->iovw
, line
, n
);
347 /* replace \n with = */
351 imp
->state
= IMPORTER_STATE_DATA_START
;
353 /* we cannot put the field in iovec until we have all data */
356 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
358 return 0; /* continue */
361 case IMPORTER_STATE_DATA_START
:
362 assert(imp
->data_size
== 0);
364 r
= get_data_size(imp
);
365 // log_debug("get_data_size() -> %d", r);
369 imp
->state
= IMPORTER_STATE_EOF
;
373 imp
->state
= imp
->data_size
> 0 ?
374 IMPORTER_STATE_DATA
: IMPORTER_STATE_DATA_FINISH
;
376 return 0; /* continue */
378 case IMPORTER_STATE_DATA
: {
382 assert(imp
->data_size
> 0);
384 r
= get_data_data(imp
, &data
);
385 // log_debug("get_data_data() -> %d", r);
389 imp
->state
= IMPORTER_STATE_EOF
;
395 field
= (char*) data
- sizeof(uint64_t) - imp
->field_len
;
396 memmove(field
+ sizeof(uint64_t), field
, imp
->field_len
);
398 r
= iovw_put(&imp
->iovw
, field
+ sizeof(uint64_t), imp
->field_len
+ imp
->data_size
);
402 imp
->state
= IMPORTER_STATE_DATA_FINISH
;
404 return 0; /* continue */
407 case IMPORTER_STATE_DATA_FINISH
:
408 r
= get_data_newline(imp
);
409 // log_debug("get_data_newline() -> %d", r);
413 imp
->state
= IMPORTER_STATE_EOF
;
418 imp
->state
= IMPORTER_STATE_LINE
;
420 return 0; /* continue */
422 assert_not_reached("wtf?");
426 int journal_importer_push_data(JournalImporter
*imp
, const char *data
, size_t size
) {
428 assert(imp
->state
!= IMPORTER_STATE_EOF
);
430 if (!realloc_buffer(imp
, imp
->filled
+ size
)) {
431 log_error("Failed to store received data of size %zu "
432 "(in addition to existing %zu bytes with %zu filled): %s",
433 size
, imp
->size
, imp
->filled
, strerror(ENOMEM
));
437 memcpy(imp
->buf
+ imp
->filled
, data
, size
);
443 void journal_importer_drop_iovw(JournalImporter
*imp
) {
444 size_t remain
, target
;
446 /* This function drops processed data that along with the iovw that points at it */
448 iovw_free_contents(&imp
->iovw
);
450 /* possibly reset buffer position */
451 remain
= imp
->filled
- imp
->offset
;
453 if (remain
== 0) /* no brainer */
454 imp
->offset
= imp
->scanned
= imp
->filled
= 0;
455 else if (imp
->offset
> imp
->size
- imp
->filled
&&
456 imp
->offset
> remain
) {
457 memcpy(imp
->buf
, imp
->buf
+ imp
->offset
, remain
);
458 imp
->offset
= imp
->scanned
= 0;
459 imp
->filled
= remain
;
463 while (target
> 16 * LINE_CHUNK
&& imp
->filled
< target
/ 2)
465 if (target
< imp
->size
) {
468 tmp
= realloc(imp
->buf
, target
);
470 log_warning("Failed to reallocate buffer to (smaller) size %zu",
473 log_debug("Reallocated buffer from %zu to %zu bytes",
481 bool journal_importer_eof(const JournalImporter
*imp
) {
482 return imp
->state
== IMPORTER_STATE_EOF
;