]>
git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journald-native.h"
23 #include "string-util.h"
24 #include "journal-remote-parse.h"
26 #define LINE_CHUNK 8*1024u
28 void source_free(RemoteSource
*source
) {
32 if (source
->fd
>= 0 && !source
->passive_fd
) {
33 log_debug("Closing fd:%d (%s)", source
->fd
, source
->name
);
34 safe_close(source
->fd
);
39 iovw_free_contents(&source
->iovw
);
41 log_debug("Writer ref count %i", source
->writer
->n_ref
);
42 writer_unref(source
->writer
);
44 sd_event_source_unref(source
->event
);
45 sd_event_source_unref(source
->buffer_event
);
51 * Initialize zero-filled source with given values. On success, takes
52 * ownerhship of fd and writer, otherwise does not touch them.
54 RemoteSource
* source_new(int fd
, bool passive_fd
, char *name
, Writer
*writer
) {
58 log_debug("Creating source for %sfd:%d (%s)",
59 passive_fd
? "passive " : "", fd
, name
);
63 source
= new0(RemoteSource
, 1);
68 source
->passive_fd
= passive_fd
;
70 source
->writer
= writer
;
75 static char* realloc_buffer(RemoteSource
*source
, size_t size
) {
76 char *b
, *old
= source
->buf
;
78 b
= GREEDY_REALLOC(source
->buf
, source
->size
, size
);
82 iovw_rebase(&source
->iovw
, old
, source
->buf
);
87 static int get_line(RemoteSource
*source
, char **line
, size_t *size
) {
92 assert(source
->state
== STATE_LINE
);
93 assert(source
->offset
<= source
->filled
);
94 assert(source
->filled
<= source
->size
);
95 assert(source
->buf
== NULL
|| source
->size
> 0);
96 assert(source
->fd
>= 0);
100 size_t start
= MAX(source
->scanned
, source
->offset
);
102 c
= memchr(source
->buf
+ start
, '\n',
103 source
->filled
- start
);
108 source
->scanned
= source
->filled
;
109 if (source
->scanned
>= DATA_SIZE_MAX
) {
110 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX
);
114 if (source
->passive_fd
)
115 /* we have to wait for some data to come to us */
118 /* We know that source->filled is at most DATA_SIZE_MAX, so if
119 we reallocate it, we'll increase the size at least a bit. */
120 assert_cc(DATA_SIZE_MAX
< ENTRY_SIZE_MAX
);
121 if (source
->size
- source
->filled
< LINE_CHUNK
&&
122 !realloc_buffer(source
, MIN(source
->filled
+ LINE_CHUNK
, ENTRY_SIZE_MAX
)))
126 assert(source
->size
- source
->filled
>= LINE_CHUNK
||
127 source
->size
== ENTRY_SIZE_MAX
);
130 source
->buf
+ source
->filled
,
131 source
->size
- source
->filled
);
134 log_error_errno(errno
, "read(%d, ..., %zu): %m",
136 source
->size
- source
->filled
);
144 *line
= source
->buf
+ source
->offset
;
145 *size
= c
+ 1 - source
->buf
- source
->offset
;
146 source
->offset
+= *size
;
151 int push_data(RemoteSource
*source
, const char *data
, size_t size
) {
153 assert(source
->state
!= STATE_EOF
);
155 if (!realloc_buffer(source
, source
->filled
+ size
)) {
156 log_error("Failed to store received data of size %zu "
157 "(in addition to existing %zu bytes with %zu filled): %s",
158 size
, source
->size
, source
->filled
, strerror(ENOMEM
));
162 memcpy(source
->buf
+ source
->filled
, data
, size
);
163 source
->filled
+= size
;
168 static int fill_fixed_size(RemoteSource
*source
, void **data
, size_t size
) {
171 assert(source
->state
== STATE_DATA_START
||
172 source
->state
== STATE_DATA
||
173 source
->state
== STATE_DATA_FINISH
);
174 assert(size
<= DATA_SIZE_MAX
);
175 assert(source
->offset
<= source
->filled
);
176 assert(source
->filled
<= source
->size
);
177 assert(source
->buf
!= NULL
|| source
->size
== 0);
178 assert(source
->buf
== NULL
|| source
->size
> 0);
179 assert(source
->fd
>= 0);
182 while (source
->filled
- source
->offset
< size
) {
185 if (source
->passive_fd
)
186 /* we have to wait for some data to come to us */
189 if (!realloc_buffer(source
, source
->offset
+ size
))
192 n
= read(source
->fd
, source
->buf
+ source
->filled
,
193 source
->size
- source
->filled
);
196 log_error_errno(errno
, "read(%d, ..., %zu): %m", source
->fd
,
197 source
->size
- source
->filled
);
205 *data
= source
->buf
+ source
->offset
;
206 source
->offset
+= size
;
211 static int get_data_size(RemoteSource
*source
) {
216 assert(source
->state
== STATE_DATA_START
);
217 assert(source
->data_size
== 0);
219 r
= fill_fixed_size(source
, &data
, sizeof(uint64_t));
223 source
->data_size
= le64toh( *(uint64_t *) data
);
224 if (source
->data_size
> DATA_SIZE_MAX
) {
225 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
226 source
->data_size
, DATA_SIZE_MAX
);
229 if (source
->data_size
== 0)
230 log_warning("Binary field with zero length");
235 static int get_data_data(RemoteSource
*source
, void **data
) {
240 assert(source
->state
== STATE_DATA
);
242 r
= fill_fixed_size(source
, data
, source
->data_size
);
249 static int get_data_newline(RemoteSource
*source
) {
254 assert(source
->state
== STATE_DATA_FINISH
);
256 r
= fill_fixed_size(source
, (void**) &data
, 1);
262 log_error("expected newline, got '%c'", *data
);
269 static int process_dunder(RemoteSource
*source
, char *line
, size_t n
) {
270 const char *timestamp
;
275 assert(line
[n
-1] == '\n');
277 /* XXX: is it worth to support timestamps in extended format?
278 * We don't produce them, but who knows... */
280 timestamp
= startswith(line
, "__CURSOR=");
282 /* ignore __CURSOR */
285 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
287 long long unsigned x
;
289 r
= safe_atollu(timestamp
, &x
);
291 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
293 source
->ts
.realtime
= x
;
294 return r
< 0 ? r
: 1;
297 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
299 long long unsigned x
;
301 r
= safe_atollu(timestamp
, &x
);
303 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
305 source
->ts
.monotonic
= x
;
306 return r
< 0 ? r
: 1;
309 timestamp
= startswith(line
, "__");
311 log_notice("Unknown dunder line %s", line
);
319 static int process_data(RemoteSource
*source
) {
322 switch(source
->state
) {
327 assert(source
->data_size
== 0);
329 r
= get_line(source
, &line
, &n
);
333 source
->state
= STATE_EOF
;
337 assert(line
[n
-1] == '\n');
340 log_trace("Received empty line, event is ready");
344 r
= process_dunder(source
, line
, n
);
346 return r
< 0 ? r
: 0;
351 LLLLLLLL0011223344...\n
353 sep
= memchr(line
, '=', n
);
358 r
= iovw_put(&source
->iovw
, line
, n
);
362 /* replace \n with = */
365 source
->field_len
= n
;
366 source
->state
= STATE_DATA_START
;
368 /* we cannot put the field in iovec until we have all data */
371 log_trace("Received: %.*s (%s)", (int) n
, line
, sep
? "text" : "binary");
373 return 0; /* continue */
376 case STATE_DATA_START
:
377 assert(source
->data_size
== 0);
379 r
= get_data_size(source
);
380 // log_debug("get_data_size() -> %d", r);
384 source
->state
= STATE_EOF
;
388 source
->state
= source
->data_size
> 0 ?
389 STATE_DATA
: STATE_DATA_FINISH
;
391 return 0; /* continue */
397 assert(source
->data_size
> 0);
399 r
= get_data_data(source
, &data
);
400 // log_debug("get_data_data() -> %d", r);
404 source
->state
= STATE_EOF
;
410 field
= (char*) data
- sizeof(uint64_t) - source
->field_len
;
411 memmove(field
+ sizeof(uint64_t), field
, source
->field_len
);
413 r
= iovw_put(&source
->iovw
, field
+ sizeof(uint64_t), source
->field_len
+ source
->data_size
);
417 source
->state
= STATE_DATA_FINISH
;
419 return 0; /* continue */
422 case STATE_DATA_FINISH
:
423 r
= get_data_newline(source
);
424 // log_debug("get_data_newline() -> %d", r);
428 source
->state
= STATE_EOF
;
432 source
->data_size
= 0;
433 source
->state
= STATE_LINE
;
435 return 0; /* continue */
437 assert_not_reached("wtf?");
441 int process_source(RemoteSource
*source
, bool compress
, bool seal
) {
442 size_t remain
, target
;
446 assert(source
->writer
);
448 r
= process_data(source
);
452 /* We have a full event */
453 log_trace("Received full event from source@%p fd:%d (%s)",
454 source
, source
->fd
, source
->name
);
456 if (!source
->iovw
.count
) {
457 log_warning("Entry with no payload, skipping");
461 assert(source
->iovw
.iovec
);
462 assert(source
->iovw
.count
);
464 r
= writer_write(source
->writer
, &source
->iovw
, &source
->ts
, compress
, seal
);
466 log_error_errno(r
, "Failed to write entry of %zu bytes: %m",
467 iovw_size(&source
->iovw
));
472 iovw_free_contents(&source
->iovw
);
474 /* possibly reset buffer position */
475 remain
= source
->filled
- source
->offset
;
477 if (remain
== 0) /* no brainer */
478 source
->offset
= source
->scanned
= source
->filled
= 0;
479 else if (source
->offset
> source
->size
- source
->filled
&&
480 source
->offset
> remain
) {
481 memcpy(source
->buf
, source
->buf
+ source
->offset
, remain
);
482 source
->offset
= source
->scanned
= 0;
483 source
->filled
= remain
;
486 target
= source
->size
;
487 while (target
> 16 * LINE_CHUNK
&& remain
< target
/ 2)
489 if (target
< source
->size
) {
492 tmp
= realloc(source
->buf
, target
);
494 log_warning("Failed to reallocate buffer to (smaller) size %zu",
497 log_debug("Reallocated buffer from %zu to %zu bytes",
498 source
->size
, target
);
500 source
->size
= target
;