]>
git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
4 This file is part of systemd.
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
25 #define LINE_CHUNK 1024u
27 void source_free(RemoteSource
*source
) {
31 if (source
->fd
>= 0) {
32 log_debug("Closing fd:%d (%s)", source
->fd
, source
->name
);
37 iovw_free_contents(&source
->iovw
);
41 static int get_line(RemoteSource
*source
, char **line
, size_t *size
) {
48 assert(source
->state
== STATE_LINE
);
49 assert(source
->filled
<= source
->size
);
50 assert(source
->buf
== NULL
|| source
->size
> 0);
52 c
= memchr(source
->buf
, '\n', source
->filled
);
57 if (source
->size
- source
->filled
< LINE_CHUNK
) {
58 // XXX: add check for maximum line length
60 if (!GREEDY_REALLOC(source
->buf
, source
->size
,
61 source
->filled
+ LINE_CHUNK
))
64 assert(source
->size
- source
->filled
>= LINE_CHUNK
);
66 n
= read(source
->fd
, source
->buf
+ source
->filled
,
67 source
->size
- source
->filled
);
69 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
)
70 log_error("read(%d, ..., %zd): %m", source
->fd
,
71 source
->size
- source
->filled
);
76 c
= memchr(source
->buf
+ source
->filled
, '\n', n
);
84 *size
= c
+ 1 - source
->buf
;
86 /* Check if something remains */
87 remain
= source
->buf
+ source
->filled
- c
- 1;
90 newsize
= MAX(remain
, LINE_CHUNK
);
91 newbuf
= malloc(newsize
);
94 memcpy(newbuf
, c
+ 1, remain
);
97 source
->size
= newsize
;
98 source
->filled
= remain
;
103 int push_data(RemoteSource
*source
, const char *data
, size_t size
) {
105 assert(source
->state
!= STATE_EOF
);
107 if (!GREEDY_REALLOC(source
->buf
, source
->size
,
108 source
->filled
+ size
))
111 memcpy(source
->buf
+ source
->filled
, data
, size
);
112 source
->filled
+= size
;
117 static int fill_fixed_size(RemoteSource
*source
, void **data
, size_t size
) {
120 size_t newsize
= 0, remain
;
123 assert(source
->state
== STATE_DATA_START
||
124 source
->state
== STATE_DATA
||
125 source
->state
== STATE_DATA_FINISH
);
126 assert(size
<= DATA_SIZE_MAX
);
127 assert(source
->filled
<= source
->size
);
128 assert(source
->buf
!= NULL
|| source
->size
== 0);
129 assert(source
->buf
== NULL
|| source
->size
> 0);
132 while(source
->filled
< size
) {
133 if (!GREEDY_REALLOC(source
->buf
, source
->size
, size
))
136 n
= read(source
->fd
, source
->buf
+ source
->filled
,
137 source
->size
- source
->filled
);
139 if (errno
!= EAGAIN
&& errno
!= EWOULDBLOCK
)
140 log_error("read(%d, ..., %zd): %m", source
->fd
,
141 source
->size
- source
->filled
);
151 /* Check if something remains */
152 assert(size
<= source
->filled
);
153 remain
= source
->filled
- size
;
155 newsize
= MAX(remain
, LINE_CHUNK
);
156 newbuf
= malloc(newsize
);
159 memcpy(newbuf
, source
->buf
+ size
, remain
);
161 source
->buf
= newbuf
;
162 source
->size
= newsize
;
163 source
->filled
= remain
;
168 static int get_data_size(RemoteSource
*source
) {
170 void _cleanup_free_
*data
= NULL
;
173 assert(source
->state
== STATE_DATA_START
);
174 assert(source
->data_size
== 0);
176 r
= fill_fixed_size(source
, &data
, sizeof(uint64_t));
180 source
->data_size
= le64toh( *(uint64_t *) data
);
181 if (source
->data_size
> DATA_SIZE_MAX
) {
182 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
183 source
->data_size
, DATA_SIZE_MAX
);
186 if (source
->data_size
== 0)
187 log_warning("Binary field with zero length");
192 static int get_data_data(RemoteSource
*source
, void **data
) {
197 assert(source
->state
== STATE_DATA
);
199 r
= fill_fixed_size(source
, data
, source
->data_size
);
206 static int get_data_newline(RemoteSource
*source
) {
208 char _cleanup_free_
*data
= NULL
;
211 assert(source
->state
== STATE_DATA_FINISH
);
213 r
= fill_fixed_size(source
, (void**) &data
, 1);
219 log_error("expected newline, got '%c'", *data
);
226 static int process_dunder(RemoteSource
*source
, char *line
, size_t n
) {
227 const char *timestamp
;
232 assert(line
[n
-1] == '\n');
234 /* XXX: is it worth to support timestamps in extended format?
235 * We don't produce them, but who knows... */
237 timestamp
= startswith(line
, "__CURSOR=");
239 /* ignore __CURSOR */
242 timestamp
= startswith(line
, "__REALTIME_TIMESTAMP=");
244 long long unsigned x
;
246 r
= safe_atollu(timestamp
, &x
);
248 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp
);
250 source
->ts
.realtime
= x
;
251 return r
< 0 ? r
: 1;
254 timestamp
= startswith(line
, "__MONOTONIC_TIMESTAMP=");
256 long long unsigned x
;
258 r
= safe_atollu(timestamp
, &x
);
260 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp
);
262 source
->ts
.monotonic
= x
;
263 return r
< 0 ? r
: 1;
266 timestamp
= startswith(line
, "__");
268 log_notice("Unknown dunder line %s", line
);
276 int process_data(RemoteSource
*source
) {
279 switch(source
->state
) {
284 assert(source
->data_size
== 0);
286 r
= get_line(source
, &line
, &n
);
290 source
->state
= STATE_EOF
;
294 assert(line
[n
-1] == '\n');
297 log_debug("Received empty line, event is ready");
302 r
= process_dunder(source
, line
, n
);
305 return r
< 0 ? r
: 0;
311 LLLLLLLL0011223344...\n
313 sep
= memchr(line
, '=', n
);
318 /* replace \n with = */
320 log_debug("Received: %.*s", (int) n
, line
);
322 r
= iovw_put(&source
->iovw
, line
, n
);
324 log_error("Failed to put line in iovect");
330 source
->state
= STATE_DATA_START
;
331 return 0; /* continue */
334 case STATE_DATA_START
:
335 assert(source
->data_size
== 0);
337 r
= get_data_size(source
);
338 log_debug("get_data_size() -> %d", r
);
342 source
->state
= STATE_EOF
;
346 source
->state
= source
->data_size
> 0 ?
347 STATE_DATA
: STATE_DATA_FINISH
;
349 return 0; /* continue */
354 assert(source
->data_size
> 0);
356 r
= get_data_data(source
, &data
);
357 log_debug("get_data_data() -> %d", r
);
361 source
->state
= STATE_EOF
;
367 r
= iovw_put(&source
->iovw
, data
, source
->data_size
);
369 log_error("failed to put binary buffer in iovect");
373 source
->state
= STATE_DATA_FINISH
;
375 return 0; /* continue */
378 case STATE_DATA_FINISH
:
379 r
= get_data_newline(source
);
380 log_debug("get_data_newline() -> %d", r
);
384 source
->state
= STATE_EOF
;
388 source
->data_size
= 0;
389 source
->state
= STATE_LINE
;
391 return 0; /* continue */
393 assert_not_reached("wtf?");
397 int process_source(RemoteSource
*source
, Writer
*writer
, bool compress
, bool seal
) {
403 r
= process_data(source
);
407 /* We have a full event */
408 log_info("Received a full event from source@%p fd:%d (%s)",
409 source
, source
->fd
, source
->name
);
411 if (!source
->iovw
.count
) {
412 log_warning("Entry with no payload, skipping");
416 assert(source
->iovw
.iovec
);
417 assert(source
->iovw
.count
);
419 r
= writer_write(writer
, &source
->iovw
, &source
->ts
, compress
, seal
);
421 log_error("Failed to write entry of %zu bytes: %s",
422 iovw_size(&source
->iovw
), strerror(-r
));
427 iovw_free_contents(&source
->iovw
);