]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
Merge pull request #1607 from keszybz/lz4-remove-v1
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "alloc-util.h"
23 #include "fd-util.h"
24 #include "journal-remote-parse.h"
25 #include "journald-native.h"
26 #include "parse-util.h"
27 #include "string-util.h"
28
29 #define LINE_CHUNK 8*1024u
30
31 void source_free(RemoteSource *source) {
32 if (!source)
33 return;
34
35 if (source->fd >= 0 && !source->passive_fd) {
36 log_debug("Closing fd:%d (%s)", source->fd, source->name);
37 safe_close(source->fd);
38 }
39
40 free(source->name);
41 free(source->buf);
42 iovw_free_contents(&source->iovw);
43
44 log_debug("Writer ref count %i", source->writer->n_ref);
45 writer_unref(source->writer);
46
47 sd_event_source_unref(source->event);
48 sd_event_source_unref(source->buffer_event);
49
50 free(source);
51 }
52
53 /**
54 * Initialize zero-filled source with given values. On success, takes
55 * ownerhship of fd and writer, otherwise does not touch them.
56 */
57 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
58
59 RemoteSource *source;
60
61 log_debug("Creating source for %sfd:%d (%s)",
62 passive_fd ? "passive " : "", fd, name);
63
64 assert(fd >= 0);
65
66 source = new0(RemoteSource, 1);
67 if (!source)
68 return NULL;
69
70 source->fd = fd;
71 source->passive_fd = passive_fd;
72 source->name = name;
73 source->writer = writer;
74
75 return source;
76 }
77
78 static char* realloc_buffer(RemoteSource *source, size_t size) {
79 char *b, *old = source->buf;
80
81 b = GREEDY_REALLOC(source->buf, source->size, size);
82 if (!b)
83 return NULL;
84
85 iovw_rebase(&source->iovw, old, source->buf);
86
87 return b;
88 }
89
90 static int get_line(RemoteSource *source, char **line, size_t *size) {
91 ssize_t n;
92 char *c = NULL;
93
94 assert(source);
95 assert(source->state == STATE_LINE);
96 assert(source->offset <= source->filled);
97 assert(source->filled <= source->size);
98 assert(source->buf == NULL || source->size > 0);
99 assert(source->fd >= 0);
100
101 for (;;) {
102 if (source->buf) {
103 size_t start = MAX(source->scanned, source->offset);
104
105 c = memchr(source->buf + start, '\n',
106 source->filled - start);
107 if (c != NULL)
108 break;
109 }
110
111 source->scanned = source->filled;
112 if (source->scanned >= DATA_SIZE_MAX) {
113 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
114 return -E2BIG;
115 }
116
117 if (source->passive_fd)
118 /* we have to wait for some data to come to us */
119 return -EAGAIN;
120
121 /* We know that source->filled is at most DATA_SIZE_MAX, so if
122 we reallocate it, we'll increase the size at least a bit. */
123 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
124 if (source->size - source->filled < LINE_CHUNK &&
125 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
126 return log_oom();
127
128 assert(source->buf);
129 assert(source->size - source->filled >= LINE_CHUNK ||
130 source->size == ENTRY_SIZE_MAX);
131
132 n = read(source->fd,
133 source->buf + source->filled,
134 source->size - source->filled);
135 if (n < 0) {
136 if (errno != EAGAIN)
137 log_error_errno(errno, "read(%d, ..., %zu): %m",
138 source->fd,
139 source->size - source->filled);
140 return -errno;
141 } else if (n == 0)
142 return 0;
143
144 source->filled += n;
145 }
146
147 *line = source->buf + source->offset;
148 *size = c + 1 - source->buf - source->offset;
149 source->offset += *size;
150
151 return 1;
152 }
153
154 int push_data(RemoteSource *source, const char *data, size_t size) {
155 assert(source);
156 assert(source->state != STATE_EOF);
157
158 if (!realloc_buffer(source, source->filled + size)) {
159 log_error("Failed to store received data of size %zu "
160 "(in addition to existing %zu bytes with %zu filled): %s",
161 size, source->size, source->filled, strerror(ENOMEM));
162 return -ENOMEM;
163 }
164
165 memcpy(source->buf + source->filled, data, size);
166 source->filled += size;
167
168 return 0;
169 }
170
171 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
172
173 assert(source);
174 assert(source->state == STATE_DATA_START ||
175 source->state == STATE_DATA ||
176 source->state == STATE_DATA_FINISH);
177 assert(size <= DATA_SIZE_MAX);
178 assert(source->offset <= source->filled);
179 assert(source->filled <= source->size);
180 assert(source->buf != NULL || source->size == 0);
181 assert(source->buf == NULL || source->size > 0);
182 assert(source->fd >= 0);
183 assert(data);
184
185 while (source->filled - source->offset < size) {
186 int n;
187
188 if (source->passive_fd)
189 /* we have to wait for some data to come to us */
190 return -EAGAIN;
191
192 if (!realloc_buffer(source, source->offset + size))
193 return log_oom();
194
195 n = read(source->fd, source->buf + source->filled,
196 source->size - source->filled);
197 if (n < 0) {
198 if (errno != EAGAIN)
199 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
200 source->size - source->filled);
201 return -errno;
202 } else if (n == 0)
203 return 0;
204
205 source->filled += n;
206 }
207
208 *data = source->buf + source->offset;
209 source->offset += size;
210
211 return 1;
212 }
213
214 static int get_data_size(RemoteSource *source) {
215 int r;
216 void *data;
217
218 assert(source);
219 assert(source->state == STATE_DATA_START);
220 assert(source->data_size == 0);
221
222 r = fill_fixed_size(source, &data, sizeof(uint64_t));
223 if (r <= 0)
224 return r;
225
226 source->data_size = le64toh( *(uint64_t *) data );
227 if (source->data_size > DATA_SIZE_MAX) {
228 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
229 source->data_size, DATA_SIZE_MAX);
230 return -EINVAL;
231 }
232 if (source->data_size == 0)
233 log_warning("Binary field with zero length");
234
235 return 1;
236 }
237
238 static int get_data_data(RemoteSource *source, void **data) {
239 int r;
240
241 assert(source);
242 assert(data);
243 assert(source->state == STATE_DATA);
244
245 r = fill_fixed_size(source, data, source->data_size);
246 if (r <= 0)
247 return r;
248
249 return 1;
250 }
251
252 static int get_data_newline(RemoteSource *source) {
253 int r;
254 char *data;
255
256 assert(source);
257 assert(source->state == STATE_DATA_FINISH);
258
259 r = fill_fixed_size(source, (void**) &data, 1);
260 if (r <= 0)
261 return r;
262
263 assert(data);
264 if (*data != '\n') {
265 log_error("expected newline, got '%c'", *data);
266 return -EINVAL;
267 }
268
269 return 1;
270 }
271
272 static int process_dunder(RemoteSource *source, char *line, size_t n) {
273 const char *timestamp;
274 int r;
275
276 assert(line);
277 assert(n > 0);
278 assert(line[n-1] == '\n');
279
280 /* XXX: is it worth to support timestamps in extended format?
281 * We don't produce them, but who knows... */
282
283 timestamp = startswith(line, "__CURSOR=");
284 if (timestamp)
285 /* ignore __CURSOR */
286 return 1;
287
288 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
289 if (timestamp) {
290 long long unsigned x;
291 line[n-1] = '\0';
292 r = safe_atollu(timestamp, &x);
293 if (r < 0)
294 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
295 else
296 source->ts.realtime = x;
297 return r < 0 ? r : 1;
298 }
299
300 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
301 if (timestamp) {
302 long long unsigned x;
303 line[n-1] = '\0';
304 r = safe_atollu(timestamp, &x);
305 if (r < 0)
306 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
307 else
308 source->ts.monotonic = x;
309 return r < 0 ? r : 1;
310 }
311
312 timestamp = startswith(line, "__");
313 if (timestamp) {
314 log_notice("Unknown dunder line %s", line);
315 return 1;
316 }
317
318 /* no dunder */
319 return 0;
320 }
321
322 static int process_data(RemoteSource *source) {
323 int r;
324
325 switch(source->state) {
326 case STATE_LINE: {
327 char *line, *sep;
328 size_t n = 0;
329
330 assert(source->data_size == 0);
331
332 r = get_line(source, &line, &n);
333 if (r < 0)
334 return r;
335 if (r == 0) {
336 source->state = STATE_EOF;
337 return r;
338 }
339 assert(n > 0);
340 assert(line[n-1] == '\n');
341
342 if (n == 1) {
343 log_trace("Received empty line, event is ready");
344 return 1;
345 }
346
347 r = process_dunder(source, line, n);
348 if (r != 0)
349 return r < 0 ? r : 0;
350
351 /* MESSAGE=xxx\n
352 or
353 COREDUMP\n
354 LLLLLLLL0011223344...\n
355 */
356 sep = memchr(line, '=', n);
357 if (sep) {
358 /* chomp newline */
359 n--;
360
361 r = iovw_put(&source->iovw, line, n);
362 if (r < 0)
363 return r;
364 } else {
365 /* replace \n with = */
366 line[n-1] = '=';
367
368 source->field_len = n;
369 source->state = STATE_DATA_START;
370
371 /* we cannot put the field in iovec until we have all data */
372 }
373
374 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
375
376 return 0; /* continue */
377 }
378
379 case STATE_DATA_START:
380 assert(source->data_size == 0);
381
382 r = get_data_size(source);
383 // log_debug("get_data_size() -> %d", r);
384 if (r < 0)
385 return r;
386 if (r == 0) {
387 source->state = STATE_EOF;
388 return 0;
389 }
390
391 source->state = source->data_size > 0 ?
392 STATE_DATA : STATE_DATA_FINISH;
393
394 return 0; /* continue */
395
396 case STATE_DATA: {
397 void *data;
398 char *field;
399
400 assert(source->data_size > 0);
401
402 r = get_data_data(source, &data);
403 // log_debug("get_data_data() -> %d", r);
404 if (r < 0)
405 return r;
406 if (r == 0) {
407 source->state = STATE_EOF;
408 return 0;
409 }
410
411 assert(data);
412
413 field = (char*) data - sizeof(uint64_t) - source->field_len;
414 memmove(field + sizeof(uint64_t), field, source->field_len);
415
416 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
417 if (r < 0)
418 return r;
419
420 source->state = STATE_DATA_FINISH;
421
422 return 0; /* continue */
423 }
424
425 case STATE_DATA_FINISH:
426 r = get_data_newline(source);
427 // log_debug("get_data_newline() -> %d", r);
428 if (r < 0)
429 return r;
430 if (r == 0) {
431 source->state = STATE_EOF;
432 return 0;
433 }
434
435 source->data_size = 0;
436 source->state = STATE_LINE;
437
438 return 0; /* continue */
439 default:
440 assert_not_reached("wtf?");
441 }
442 }
443
444 int process_source(RemoteSource *source, bool compress, bool seal) {
445 size_t remain, target;
446 int r;
447
448 assert(source);
449 assert(source->writer);
450
451 r = process_data(source);
452 if (r <= 0)
453 return r;
454
455 /* We have a full event */
456 log_trace("Received full event from source@%p fd:%d (%s)",
457 source, source->fd, source->name);
458
459 if (!source->iovw.count) {
460 log_warning("Entry with no payload, skipping");
461 goto freeing;
462 }
463
464 assert(source->iovw.iovec);
465 assert(source->iovw.count);
466
467 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
468 if (r < 0)
469 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
470 iovw_size(&source->iovw));
471 else
472 r = 1;
473
474 freeing:
475 iovw_free_contents(&source->iovw);
476
477 /* possibly reset buffer position */
478 remain = source->filled - source->offset;
479
480 if (remain == 0) /* no brainer */
481 source->offset = source->scanned = source->filled = 0;
482 else if (source->offset > source->size - source->filled &&
483 source->offset > remain) {
484 memcpy(source->buf, source->buf + source->offset, remain);
485 source->offset = source->scanned = 0;
486 source->filled = remain;
487 }
488
489 target = source->size;
490 while (target > 16 * LINE_CHUNK && remain < target / 2)
491 target /= 2;
492 if (target < source->size) {
493 char *tmp;
494
495 tmp = realloc(source->buf, target);
496 if (!tmp)
497 log_warning("Failed to reallocate buffer to (smaller) size %zu",
498 target);
499 else {
500 log_debug("Reallocated buffer from %zu to %zu bytes",
501 source->size, target);
502 source->buf = tmp;
503 source->size = target;
504 }
505 }
506
507 return r;
508 }