]>
git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
23 #include "journal-remote-parse.h"
24 #include "journald-native.h"
25 #include "string-util.h"
27 #define LINE_CHUNK 8*1024u
29 void source_free(RemoteSource
*source
) {
33 if (source
->fd
>= 0 && !source
->passive_fd
) {
34 log_debug("Closing fd:%d (%s)", source
->fd
, source
->name
);
35 safe_close(source
->fd
);
40 iovw_free_contents(&source
->iovw
);
42 log_debug("Writer ref count %i", source
->writer
->n_ref
);
43 writer_unref(source
->writer
);
45 sd_event_source_unref(source
->event
);
46 sd_event_source_unref(source
->buffer_event
);
52 * Initialize zero-filled source with given values. On success, takes
53 * ownerhship of fd and writer, otherwise does not touch them.
55 RemoteSource
* source_new(int fd
, bool passive_fd
, char *name
, Writer
*writer
) {
59 log_debug("Creating source for %sfd:%d (%s)",
60 passive_fd
? "passive " : "", fd
, name
);
64 source
= new0(RemoteSource
, 1);
69 source
->passive_fd
= passive_fd
;
71 source
->writer
= writer
;
76 static char* realloc_buffer(RemoteSource
*source
, size_t size
) {
77 char *b
, *old
= source
->buf
;
79 b
= GREEDY_REALLOC(source
->buf
, source
->size
, size
);
83 iovw_rebase(&source
->iovw
, old
, source
->buf
);
88 static int get_line(RemoteSource
*source
, char **line
, size_t *size
) {
93 assert(source
->state
== STATE_LINE
);
94 assert(source
->offset
<= source
->filled
);
95 assert(source
->filled
<= source
->size
);
96 assert(source
->buf
== NULL
|| source
->size
> 0);
97 assert(source
->fd
>= 0);
101 size_t start
= MAX(source
->scanned
, source
->offset
);
103 c
= memchr(source
->buf
+ start
, '\n',
104 source
->filled
- start
);
109 source
->scanned
= source
->filled
;
110 if (source
->scanned
>= DATA_SIZE_MAX
) {
111 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX
);
115 if (source
->passive_fd
)
116 /* we have to wait for some data to come to us */
119 /* We know that source->filled is at most DATA_SIZE_MAX, so if
120 we reallocate it, we'll increase the size at least a bit. */
121 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
122 if (source
->size
- source
->filled
< LINE_CHUNK
&&
123 !realloc_buffer(source
, MIN(source
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
127 assert(source
->size
- source
->filled
>= LINE_CHUNK
||
128 source
->size
== ENTRY_SIZE_MAX
);
131 source
->buf
+ source
->filled
,
132 source
->size
- source
->filled
);
135 log_error_errno(errno
, "read(%d, ..., %zu): %m",
137 source
->size
- source
->filled
);
145 *line
= source
->buf
+ source
->offset
;
146 *size
= c
+ 1 - source
->buf
- source
->offset
;
147 source
->offset
+= *size
;
152 int push_data(RemoteSource
*source
, const char *data
, size_t size
) {
154 assert(source
->state
!= STATE_EOF
);
156 if (!realloc_buffer(source
, source
->filled
+ size
)) {
157 log_error("Failed to store received data of size %zu "
158 "(in addition to existing %zu bytes with %zu filled): %s",
159 size
, source
->size
, source
->filled
, strerror(ENOMEM
));
163 memcpy(source
->buf
+ source
->filled
, data
, size
);
164 source
->filled
+= size
;
169 static int fill_fixed_size(RemoteSource
*source
, void **data
, size_t size
) {
172 assert(source
->state
== STATE_DATA_START
||
173 source
->state
== STATE_DATA
||
174 source
->state
== STATE_DATA_FINISH
);
175 assert(size
<= DATA_SIZE_MAX
);
176 assert(source
->offset
<= source
->filled
);
177 assert(source
->filled
<= source
->size
);
178 assert(source
->buf
!= NULL
|| source
->size
== 0);
179 assert(source
->buf
== NULL
|| source
->size
> 0);
180 assert(source
->fd
>= 0);
183 while (source
->filled
- source
->offset
< size
) {
186 if (source
->passive_fd
)
187 /* we have to wait for some data to come to us */
190 if (!realloc_buffer(source
, source
->offset
+ size
))
193 n
= read(source
->fd
, source
->buf
+ source
->filled
,
194 source
->size
- source
->filled
);
197 log_error_errno(errno
, "read(%d, ..., %zu): %m", source
->fd
,
198 source
->size
- source
->filled
);
206 *data
= source
->buf
+ source
->offset
;
207 source
->offset
+= size
;
212 static int get_data_size(RemoteSource
*source
) {
217 assert(source
->state
== STATE_DATA_START
);
218 assert(source
->data_size
== 0);
220 r
= fill_fixed_size(source
, &data
, sizeof(uint64_t));
224 source
->data_size
= le64toh( *(uint64_t *) data
);
225 if (source
->data_size
> DATA_SIZE_MAX
) {
226 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
227 source
->data_size
, DATA_SIZE_MAX
);
230 if (source
->data_size
== 0)
231 log_warning("Binary field with zero length");
236 static int get_data_data(RemoteSource
*source
, void **data
) {
241 assert(source
->state
== STATE_DATA
);
243 r
= fill_fixed_size(source
, data
, source
->data_size
);
250 static int get_data_newline(RemoteSource
*source
) {
255 assert(source
->state
== STATE_DATA_FINISH
);
257 r
= fill_fixed_size(source
, (void**) &data
, 1);
263 log_error("expected newline, got '%c'", *data
);
270 static int process_dunder(RemoteSource
*source
, char *line
, size_t n
) {
271 const char *timestamp
;
276 assert(line
[n
-1] == '\n');
278 /* XXX: is it worth to support timestamps in extended format?
279 * We don't produce them, but who knows... */
281 timestamp
= startswith(line
, "__CURSOR=");
283 /* ignore __CURSOR */
286 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
288 long long unsigned x
;
290 r
= safe_atollu(timestamp
, &x
);
292 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
294 source
->ts
.realtime
= x
;
295 return r
< 0 ? r
: 1;
298 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
300 long long unsigned x
;
302 r
= safe_atollu(timestamp
, &x
);
304 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
306 source
->ts
.monotonic
= x
;
307 return r
< 0 ? r
: 1;
310 timestamp
= startswith(line
, "__");
312 log_notice("Unknown dunder line %s", line
);
320 static int process_data(RemoteSource
*source
) {
323 switch(source
->state
) {
328 assert(source
->data_size
== 0);
330 r
= get_line(source
, &line
, &n
);
334 source
->state
= STATE_EOF
;
338 assert(line
[n
-1] == '\n');
341 log_trace("Received empty line, event is ready");
345 r
= process_dunder(source
, line
, n
);
347 return r
< 0 ? r
: 0;
352 LLLLLLLL0011223344...\n
354 sep
= memchr(line
, '=', n
);
359 r
= iovw_put(&source
->iovw
, line
, n
);
363 /* replace \n with = */
366 source
->field_len
= n
;
367 source
->state
= STATE_DATA_START
;
369 /* we cannot put the field in iovec until we have all data */
372 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
374 return 0; /* continue */
377 case STATE_DATA_START
:
378 assert(source
->data_size
== 0);
380 r
= get_data_size(source
);
381 // log_debug("get_data_size() -> %d", r);
385 source
->state
= STATE_EOF
;
389 source
->state
= source
->data_size
> 0 ?
390 STATE_DATA
: STATE_DATA_FINISH
;
392 return 0; /* continue */
398 assert(source
->data_size
> 0);
400 r
= get_data_data(source
, &data
);
401 // log_debug("get_data_data() -> %d", r);
405 source
->state
= STATE_EOF
;
411 field
= (char*) data
- sizeof(uint64_t) - source
->field_len
;
412 memmove(field
+ sizeof(uint64_t), field
, source
->field_len
);
414 r
= iovw_put(&source
->iovw
, field
+ sizeof(uint64_t), source
->field_len
+ source
->data_size
);
418 source
->state
= STATE_DATA_FINISH
;
420 return 0; /* continue */
423 case STATE_DATA_FINISH
:
424 r
= get_data_newline(source
);
425 // log_debug("get_data_newline() -> %d", r);
429 source
->state
= STATE_EOF
;
433 source
->data_size
= 0;
434 source
->state
= STATE_LINE
;
436 return 0; /* continue */
438 assert_not_reached("wtf?");
442 int process_source(RemoteSource
*source
, bool compress
, bool seal
) {
443 size_t remain
, target
;
447 assert(source
->writer
);
449 r
= process_data(source
);
453 /* We have a full event */
454 log_trace("Received full event from source@%p fd:%d (%s)",
455 source
, source
->fd
, source
->name
);
457 if (!source
->iovw
.count
) {
458 log_warning("Entry with no payload, skipping");
462 assert(source
->iovw
.iovec
);
463 assert(source
->iovw
.count
);
465 r
= writer_write(source
->writer
, &source
->iovw
, &source
->ts
, compress
, seal
);
467 log_error_errno(r
, "Failed to write entry of %zu bytes: %m",
468 iovw_size(&source
->iovw
));
473 iovw_free_contents(&source
->iovw
);
475 /* possibly reset buffer position */
476 remain
= source
->filled
- source
->offset
;
478 if (remain
== 0) /* no brainer */
479 source
->offset
= source
->scanned
= source
->filled
= 0;
480 else if (source
->offset
> source
->size
- source
->filled
&&
481 source
->offset
> remain
) {
482 memcpy(source
->buf
, source
->buf
+ source
->offset
, remain
);
483 source
->offset
= source
->scanned
= 0;
484 source
->filled
= remain
;
487 target
= source
->size
;
488 while (target
> 16 * LINE_CHUNK
&& remain
< target
/ 2)
490 if (target
< source
->size
) {
493 tmp
= realloc(source
->buf
, target
);
495 log_warning("Failed to reallocate buffer to (smaller) size %zu",
498 log_debug("Reallocated buffer from %zu to %zu bytes",
499 source
->size
, target
);
501 source
->size
= target
;