]>
git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "alloc-util.h"
24 #include "journal-remote-parse.h"
25 #include "journald-native.h"
26 #include "parse-util.h"
27 #include "string-util.h"
29 #define LINE_CHUNK 8*1024u
31 void source_free(RemoteSource
*source
) {
35 if (source
->fd
>= 0 && !source
->passive_fd
) {
36 log_debug("Closing fd:%d (%s)", source
->fd
, source
->name
);
37 safe_close(source
->fd
);
42 iovw_free_contents(&source
->iovw
);
44 log_debug("Writer ref count %i", source
->writer
->n_ref
);
45 writer_unref(source
->writer
);
47 sd_event_source_unref(source
->event
);
48 sd_event_source_unref(source
->buffer_event
);
54 * Initialize zero-filled source with given values. On success, takes
55 * ownerhship of fd and writer, otherwise does not touch them.
57 RemoteSource
* source_new(int fd
, bool passive_fd
, char *name
, Writer
*writer
) {
61 log_debug("Creating source for %sfd:%d (%s)",
62 passive_fd
? "passive " : "", fd
, name
);
66 source
= new0(RemoteSource
, 1);
71 source
->passive_fd
= passive_fd
;
73 source
->writer
= writer
;
78 static char* realloc_buffer(RemoteSource
*source
, size_t size
) {
79 char *b
, *old
= source
->buf
;
81 b
= GREEDY_REALLOC(source
->buf
, source
->size
, size
);
85 iovw_rebase(&source
->iovw
, old
, source
->buf
);
90 static int get_line(RemoteSource
*source
, char **line
, size_t *size
) {
95 assert(source
->state
== STATE_LINE
);
96 assert(source
->offset
<= source
->filled
);
97 assert(source
->filled
<= source
->size
);
98 assert(source
->buf
== NULL
|| source
->size
> 0);
99 assert(source
->fd
>= 0);
103 size_t start
= MAX(source
->scanned
, source
->offset
);
105 c
= memchr(source
->buf
+ start
, '\n',
106 source
->filled
- start
);
111 source
->scanned
= source
->filled
;
112 if (source
->scanned
>= DATA_SIZE_MAX
) {
113 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX
);
117 if (source
->passive_fd
)
118 /* we have to wait for some data to come to us */
121 /* We know that source->filled is at most DATA_SIZE_MAX, so if
122 we reallocate it, we'll increase the size at least a bit. */
123 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
124 if (source
->size
- source
->filled
< LINE_CHUNK
&&
125 !realloc_buffer(source
, MIN(source
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
129 assert(source
->size
- source
->filled
>= LINE_CHUNK
||
130 source
->size
== ENTRY_SIZE_MAX
);
133 source
->buf
+ source
->filled
,
134 source
->size
- source
->filled
);
137 log_error_errno(errno
, "read(%d, ..., %zu): %m",
139 source
->size
- source
->filled
);
147 *line
= source
->buf
+ source
->offset
;
148 *size
= c
+ 1 - source
->buf
- source
->offset
;
149 source
->offset
+= *size
;
154 int push_data(RemoteSource
*source
, const char *data
, size_t size
) {
156 assert(source
->state
!= STATE_EOF
);
158 if (!realloc_buffer(source
, source
->filled
+ size
)) {
159 log_error("Failed to store received data of size %zu "
160 "(in addition to existing %zu bytes with %zu filled): %s",
161 size
, source
->size
, source
->filled
, strerror(ENOMEM
));
165 memcpy(source
->buf
+ source
->filled
, data
, size
);
166 source
->filled
+= size
;
171 static int fill_fixed_size(RemoteSource
*source
, void **data
, size_t size
) {
174 assert(source
->state
== STATE_DATA_START
||
175 source
->state
== STATE_DATA
||
176 source
->state
== STATE_DATA_FINISH
);
177 assert(size
<= DATA_SIZE_MAX
);
178 assert(source
->offset
<= source
->filled
);
179 assert(source
->filled
<= source
->size
);
180 assert(source
->buf
!= NULL
|| source
->size
== 0);
181 assert(source
->buf
== NULL
|| source
->size
> 0);
182 assert(source
->fd
>= 0);
185 while (source
->filled
- source
->offset
< size
) {
188 if (source
->passive_fd
)
189 /* we have to wait for some data to come to us */
192 if (!realloc_buffer(source
, source
->offset
+ size
))
195 n
= read(source
->fd
, source
->buf
+ source
->filled
,
196 source
->size
- source
->filled
);
199 log_error_errno(errno
, "read(%d, ..., %zu): %m", source
->fd
,
200 source
->size
- source
->filled
);
208 *data
= source
->buf
+ source
->offset
;
209 source
->offset
+= size
;
214 static int get_data_size(RemoteSource
*source
) {
219 assert(source
->state
== STATE_DATA_START
);
220 assert(source
->data_size
== 0);
222 r
= fill_fixed_size(source
, &data
, sizeof(uint64_t));
226 source
->data_size
= le64toh( *(uint64_t *) data
);
227 if (source
->data_size
> DATA_SIZE_MAX
) {
228 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
229 source
->data_size
, DATA_SIZE_MAX
);
232 if (source
->data_size
== 0)
233 log_warning("Binary field with zero length");
238 static int get_data_data(RemoteSource
*source
, void **data
) {
243 assert(source
->state
== STATE_DATA
);
245 r
= fill_fixed_size(source
, data
, source
->data_size
);
252 static int get_data_newline(RemoteSource
*source
) {
257 assert(source
->state
== STATE_DATA_FINISH
);
259 r
= fill_fixed_size(source
, (void**) &data
, 1);
265 log_error("expected newline, got '%c'", *data
);
272 static int process_dunder(RemoteSource
*source
, char *line
, size_t n
) {
273 const char *timestamp
;
278 assert(line
[n
-1] == '\n');
280 /* XXX: is it worth to support timestamps in extended format?
281 * We don't produce them, but who knows... */
283 timestamp
= startswith(line
, "__CURSOR=");
285 /* ignore __CURSOR */
288 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
290 long long unsigned x
;
292 r
= safe_atollu(timestamp
, &x
);
294 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
296 source
->ts
.realtime
= x
;
297 return r
< 0 ? r
: 1;
300 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
302 long long unsigned x
;
304 r
= safe_atollu(timestamp
, &x
);
306 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
308 source
->ts
.monotonic
= x
;
309 return r
< 0 ? r
: 1;
312 timestamp
= startswith(line
, "__");
314 log_notice("Unknown dunder line %s", line
);
322 static int process_data(RemoteSource
*source
) {
325 switch(source
->state
) {
330 assert(source
->data_size
== 0);
332 r
= get_line(source
, &line
, &n
);
336 source
->state
= STATE_EOF
;
340 assert(line
[n
-1] == '\n');
343 log_trace("Received empty line, event is ready");
347 r
= process_dunder(source
, line
, n
);
349 return r
< 0 ? r
: 0;
354 LLLLLLLL0011223344...\n
356 sep
= memchr(line
, '=', n
);
361 r
= iovw_put(&source
->iovw
, line
, n
);
365 /* replace \n with = */
368 source
->field_len
= n
;
369 source
->state
= STATE_DATA_START
;
371 /* we cannot put the field in iovec until we have all data */
374 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
376 return 0; /* continue */
379 case STATE_DATA_START
:
380 assert(source
->data_size
== 0);
382 r
= get_data_size(source
);
383 // log_debug("get_data_size() -> %d", r);
387 source
->state
= STATE_EOF
;
391 source
->state
= source
->data_size
> 0 ?
392 STATE_DATA
: STATE_DATA_FINISH
;
394 return 0; /* continue */
400 assert(source
->data_size
> 0);
402 r
= get_data_data(source
, &data
);
403 // log_debug("get_data_data() -> %d", r);
407 source
->state
= STATE_EOF
;
413 field
= (char*) data
- sizeof(uint64_t) - source
->field_len
;
414 memmove(field
+ sizeof(uint64_t), field
, source
->field_len
);
416 r
= iovw_put(&source
->iovw
, field
+ sizeof(uint64_t), source
->field_len
+ source
->data_size
);
420 source
->state
= STATE_DATA_FINISH
;
422 return 0; /* continue */
425 case STATE_DATA_FINISH
:
426 r
= get_data_newline(source
);
427 // log_debug("get_data_newline() -> %d", r);
431 source
->state
= STATE_EOF
;
435 source
->data_size
= 0;
436 source
->state
= STATE_LINE
;
438 return 0; /* continue */
440 assert_not_reached("wtf?");
444 int process_source(RemoteSource
*source
, bool compress
, bool seal
) {
445 size_t remain
, target
;
449 assert(source
->writer
);
451 r
= process_data(source
);
455 /* We have a full event */
456 log_trace("Received full event from source@%p fd:%d (%s)",
457 source
, source
->fd
, source
->name
);
459 if (!source
->iovw
.count
) {
460 log_warning("Entry with no payload, skipping");
464 assert(source
->iovw
.iovec
);
465 assert(source
->iovw
.count
);
467 r
= writer_write(source
->writer
, &source
->iovw
, &source
->ts
, compress
, seal
);
469 log_error_errno(r
, "Failed to write entry of %zu bytes: %m",
470 iovw_size(&source
->iovw
));
475 iovw_free_contents(&source
->iovw
);
477 /* possibly reset buffer position */
478 remain
= source
->filled
- source
->offset
;
480 if (remain
== 0) /* no brainer */
481 source
->offset
= source
->scanned
= source
->filled
= 0;
482 else if (source
->offset
> source
->size
- source
->filled
&&
483 source
->offset
> remain
) {
484 memcpy(source
->buf
, source
->buf
+ source
->offset
, remain
);
485 source
->offset
= source
->scanned
= 0;
486 source
->filled
= remain
;
489 target
= source
->size
;
490 while (target
> 16 * LINE_CHUNK
&& remain
< target
/ 2)
492 if (target
< source
->size
) {
495 tmp
= realloc(source
->buf
, target
);
497 log_warning("Failed to reallocate buffer to (smaller) size %zu",
500 log_debug("Reallocated buffer from %zu to %zu bytes",
501 source
->size
, target
);
503 source
->size
= target
;