]>
git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
23 #include "journal-remote-parse.h"
24 #include "journald-native.h"
25 #include "parse-util.h"
26 #include "string-util.h"
28 #define LINE_CHUNK 8*1024u
30 void source_free(RemoteSource
*source
) {
34 if (source
->fd
>= 0 && !source
->passive_fd
) {
35 log_debug("Closing fd:%d (%s)", source
->fd
, source
->name
);
36 safe_close(source
->fd
);
41 iovw_free_contents(&source
->iovw
);
43 log_debug("Writer ref count %i", source
->writer
->n_ref
);
44 writer_unref(source
->writer
);
46 sd_event_source_unref(source
->event
);
47 sd_event_source_unref(source
->buffer_event
);
53 * Initialize zero-filled source with given values. On success, takes
54 * ownerhship of fd and writer, otherwise does not touch them.
56 RemoteSource
* source_new(int fd
, bool passive_fd
, char *name
, Writer
*writer
) {
60 log_debug("Creating source for %sfd:%d (%s)",
61 passive_fd
? "passive " : "", fd
, name
);
65 source
= new0(RemoteSource
, 1);
70 source
->passive_fd
= passive_fd
;
72 source
->writer
= writer
;
77 static char* realloc_buffer(RemoteSource
*source
, size_t size
) {
78 char *b
, *old
= source
->buf
;
80 b
= GREEDY_REALLOC(source
->buf
, source
->size
, size
);
84 iovw_rebase(&source
->iovw
, old
, source
->buf
);
89 static int get_line(RemoteSource
*source
, char **line
, size_t *size
) {
94 assert(source
->state
== STATE_LINE
);
95 assert(source
->offset
<= source
->filled
);
96 assert(source
->filled
<= source
->size
);
97 assert(source
->buf
== NULL
|| source
->size
> 0);
98 assert(source
->fd
>= 0);
102 size_t start
= MAX(source
->scanned
, source
->offset
);
104 c
= memchr(source
->buf
+ start
, '\n',
105 source
->filled
- start
);
110 source
->scanned
= source
->filled
;
111 if (source
->scanned
>= DATA_SIZE_MAX
) {
112 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX
);
116 if (source
->passive_fd
)
117 /* we have to wait for some data to come to us */
120 /* We know that source->filled is at most DATA_SIZE_MAX, so if
121 we reallocate it, we'll increase the size at least a bit. */
122 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
123 if (source
->size
- source
->filled
< LINE_CHUNK
&&
124 !realloc_buffer(source
, MIN(source
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
128 assert(source
->size
- source
->filled
>= LINE_CHUNK
||
129 source
->size
== ENTRY_SIZE_MAX
);
132 source
->buf
+ source
->filled
,
133 source
->size
- source
->filled
);
136 log_error_errno(errno
, "read(%d, ..., %zu): %m",
138 source
->size
- source
->filled
);
146 *line
= source
->buf
+ source
->offset
;
147 *size
= c
+ 1 - source
->buf
- source
->offset
;
148 source
->offset
+= *size
;
153 int push_data(RemoteSource
*source
, const char *data
, size_t size
) {
155 assert(source
->state
!= STATE_EOF
);
157 if (!realloc_buffer(source
, source
->filled
+ size
)) {
158 log_error("Failed to store received data of size %zu "
159 "(in addition to existing %zu bytes with %zu filled): %s",
160 size
, source
->size
, source
->filled
, strerror(ENOMEM
));
164 memcpy(source
->buf
+ source
->filled
, data
, size
);
165 source
->filled
+= size
;
170 static int fill_fixed_size(RemoteSource
*source
, void **data
, size_t size
) {
173 assert(source
->state
== STATE_DATA_START
||
174 source
->state
== STATE_DATA
||
175 source
->state
== STATE_DATA_FINISH
);
176 assert(size
<= DATA_SIZE_MAX
);
177 assert(source
->offset
<= source
->filled
);
178 assert(source
->filled
<= source
->size
);
179 assert(source
->buf
!= NULL
|| source
->size
== 0);
180 assert(source
->buf
== NULL
|| source
->size
> 0);
181 assert(source
->fd
>= 0);
184 while (source
->filled
- source
->offset
< size
) {
187 if (source
->passive_fd
)
188 /* we have to wait for some data to come to us */
191 if (!realloc_buffer(source
, source
->offset
+ size
))
194 n
= read(source
->fd
, source
->buf
+ source
->filled
,
195 source
->size
- source
->filled
);
198 log_error_errno(errno
, "read(%d, ..., %zu): %m", source
->fd
,
199 source
->size
- source
->filled
);
207 *data
= source
->buf
+ source
->offset
;
208 source
->offset
+= size
;
213 static int get_data_size(RemoteSource
*source
) {
218 assert(source
->state
== STATE_DATA_START
);
219 assert(source
->data_size
== 0);
221 r
= fill_fixed_size(source
, &data
, sizeof(uint64_t));
225 source
->data_size
= le64toh( *(uint64_t *) data
);
226 if (source
->data_size
> DATA_SIZE_MAX
) {
227 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
228 source
->data_size
, DATA_SIZE_MAX
);
231 if (source
->data_size
== 0)
232 log_warning("Binary field with zero length");
237 static int get_data_data(RemoteSource
*source
, void **data
) {
242 assert(source
->state
== STATE_DATA
);
244 r
= fill_fixed_size(source
, data
, source
->data_size
);
251 static int get_data_newline(RemoteSource
*source
) {
256 assert(source
->state
== STATE_DATA_FINISH
);
258 r
= fill_fixed_size(source
, (void**) &data
, 1);
264 log_error("expected newline, got '%c'", *data
);
271 static int process_dunder(RemoteSource
*source
, char *line
, size_t n
) {
272 const char *timestamp
;
277 assert(line
[n
-1] == '\n');
279 /* XXX: is it worth to support timestamps in extended format?
280 * We don't produce them, but who knows... */
282 timestamp
= startswith(line
, "__CURSOR=");
284 /* ignore __CURSOR */
287 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
289 long long unsigned x
;
291 r
= safe_atollu(timestamp
, &x
);
293 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
295 source
->ts
.realtime
= x
;
296 return r
< 0 ? r
: 1;
299 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
301 long long unsigned x
;
303 r
= safe_atollu(timestamp
, &x
);
305 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
307 source
->ts
.monotonic
= x
;
308 return r
< 0 ? r
: 1;
311 timestamp
= startswith(line
, "__");
313 log_notice("Unknown dunder line %s", line
);
321 static int process_data(RemoteSource
*source
) {
324 switch(source
->state
) {
329 assert(source
->data_size
== 0);
331 r
= get_line(source
, &line
, &n
);
335 source
->state
= STATE_EOF
;
339 assert(line
[n
-1] == '\n');
342 log_trace("Received empty line, event is ready");
346 r
= process_dunder(source
, line
, n
);
348 return r
< 0 ? r
: 0;
353 LLLLLLLL0011223344...\n
355 sep
= memchr(line
, '=', n
);
360 r
= iovw_put(&source
->iovw
, line
, n
);
364 /* replace \n with = */
367 source
->field_len
= n
;
368 source
->state
= STATE_DATA_START
;
370 /* we cannot put the field in iovec until we have all data */
373 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
375 return 0; /* continue */
378 case STATE_DATA_START
:
379 assert(source
->data_size
== 0);
381 r
= get_data_size(source
);
382 // log_debug("get_data_size() -> %d", r);
386 source
->state
= STATE_EOF
;
390 source
->state
= source
->data_size
> 0 ?
391 STATE_DATA
: STATE_DATA_FINISH
;
393 return 0; /* continue */
399 assert(source
->data_size
> 0);
401 r
= get_data_data(source
, &data
);
402 // log_debug("get_data_data() -> %d", r);
406 source
->state
= STATE_EOF
;
412 field
= (char*) data
- sizeof(uint64_t) - source
->field_len
;
413 memmove(field
+ sizeof(uint64_t), field
, source
->field_len
);
415 r
= iovw_put(&source
->iovw
, field
+ sizeof(uint64_t), source
->field_len
+ source
->data_size
);
419 source
->state
= STATE_DATA_FINISH
;
421 return 0; /* continue */
424 case STATE_DATA_FINISH
:
425 r
= get_data_newline(source
);
426 // log_debug("get_data_newline() -> %d", r);
430 source
->state
= STATE_EOF
;
434 source
->data_size
= 0;
435 source
->state
= STATE_LINE
;
437 return 0; /* continue */
439 assert_not_reached("wtf?");
443 int process_source(RemoteSource
*source
, bool compress
, bool seal
) {
444 size_t remain
, target
;
448 assert(source
->writer
);
450 r
= process_data(source
);
454 /* We have a full event */
455 log_trace("Received full event from source@%p fd:%d (%s)",
456 source
, source
->fd
, source
->name
);
458 if (!source
->iovw
.count
) {
459 log_warning("Entry with no payload, skipping");
463 assert(source
->iovw
.iovec
);
464 assert(source
->iovw
.count
);
466 r
= writer_write(source
->writer
, &source
->iovw
, &source
->ts
, compress
, seal
);
468 log_error_errno(r
, "Failed to write entry of %zu bytes: %m",
469 iovw_size(&source
->iovw
));
474 iovw_free_contents(&source
->iovw
);
476 /* possibly reset buffer position */
477 remain
= source
->filled
- source
->offset
;
479 if (remain
== 0) /* no brainer */
480 source
->offset
= source
->scanned
= source
->filled
= 0;
481 else if (source
->offset
> source
->size
- source
->filled
&&
482 source
->offset
> remain
) {
483 memcpy(source
->buf
, source
->buf
+ source
->offset
, remain
);
484 source
->offset
= source
->scanned
= 0;
485 source
->filled
= remain
;
488 target
= source
->size
;
489 while (target
> 16 * LINE_CHUNK
&& remain
< target
/ 2)
491 if (target
< source
->size
) {
494 tmp
= realloc(source
->buf
, target
);
496 log_warning("Failed to reallocate buffer to (smaller) size %zu",
499 log_debug("Reallocated buffer from %zu to %zu bytes",
500 source
->size
, target
);
502 source
->size
= target
;