]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-remote-parse.c
util-lib: split out allocation calls into alloc-util.[ch]
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
fdfccdbc
ZJS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
b5efdb8a 22#include "alloc-util.h"
3ffd4af2
LP
23#include "fd-util.h"
24#include "journal-remote-parse.h"
fdfccdbc 25#include "journald-native.h"
6bedfcbb 26#include "parse-util.h"
07630cea 27#include "string-util.h"
fdfccdbc 28
4a0a6ac0 29#define LINE_CHUNK 8*1024u
fdfccdbc
ZJS
30
31void source_free(RemoteSource *source) {
32 if (!source)
33 return;
34
9ff48d09 35 if (source->fd >= 0 && !source->passive_fd) {
fdfccdbc 36 log_debug("Closing fd:%d (%s)", source->fd, source->name);
8201af08 37 safe_close(source->fd);
fdfccdbc 38 }
9ff48d09 39
fdfccdbc
ZJS
40 free(source->name);
41 free(source->buf);
42 iovw_free_contents(&source->iovw);
8201af08 43
1fa2f38f 44 log_debug("Writer ref count %i", source->writer->n_ref);
9ff48d09
ZJS
45 writer_unref(source->writer);
46
8201af08 47 sd_event_source_unref(source->event);
043945b9 48 sd_event_source_unref(source->buffer_event);
8201af08 49
fdfccdbc
ZJS
50 free(source);
51}
52
9ff48d09
ZJS
53/**
54 * Initialize zero-filled source with given values. On success, takes
55 * ownerhship of fd and writer, otherwise does not touch them.
56 */
57RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
58
59 RemoteSource *source;
60
61 log_debug("Creating source for %sfd:%d (%s)",
62 passive_fd ? "passive " : "", fd, name);
63
64 assert(fd >= 0);
65
66 source = new0(RemoteSource, 1);
67 if (!source)
68 return NULL;
69
70 source->fd = fd;
71 source->passive_fd = passive_fd;
72 source->name = name;
73 source->writer = writer;
74
75 return source;
76}
77
92b10cbc
ZJS
78static char* realloc_buffer(RemoteSource *source, size_t size) {
79 char *b, *old = source->buf;
80
81 b = GREEDY_REALLOC(source->buf, source->size, size);
82 if (!b)
83 return NULL;
84
85 iovw_rebase(&source->iovw, old, source->buf);
86
87 return b;
88}
89
fdfccdbc 90static int get_line(RemoteSource *source, char **line, size_t *size) {
92b10cbc 91 ssize_t n;
60921179 92 char *c = NULL;
fdfccdbc
ZJS
93
94 assert(source);
95 assert(source->state == STATE_LINE);
92b10cbc 96 assert(source->offset <= source->filled);
fdfccdbc
ZJS
97 assert(source->filled <= source->size);
98 assert(source->buf == NULL || source->size > 0);
9ff48d09 99 assert(source->fd >= 0);
fdfccdbc 100
57255510 101 for (;;) {
92b10cbc
ZJS
102 if (source->buf) {
103 size_t start = MAX(source->scanned, source->offset);
104
105 c = memchr(source->buf + start, '\n',
106 source->filled - start);
107 if (c != NULL)
108 break;
109 }
851d4e2a
ZJS
110
111 source->scanned = source->filled;
112 if (source->scanned >= DATA_SIZE_MAX) {
113 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
114 return -E2BIG;
115 }
60921179 116
9ff48d09 117 if (source->passive_fd)
851d4e2a 118 /* we have to wait for some data to come to us */
ff55c3c7 119 return -EAGAIN;
fdfccdbc 120
39d0fd9c
ZJS
121 /* We know that source->filled is at most DATA_SIZE_MAX, so if
122 we reallocate it, we'll increase the size at least a bit. */
123 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
851d4e2a 124 if (source->size - source->filled < LINE_CHUNK &&
39d0fd9c 125 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
851d4e2a 126 return log_oom();
9786767a 127
39d0fd9c 128 assert(source->buf);
4a0a6ac0 129 assert(source->size - source->filled >= LINE_CHUNK ||
92b10cbc 130 source->size == ENTRY_SIZE_MAX);
fdfccdbc 131
39d0fd9c
ZJS
132 n = read(source->fd,
133 source->buf + source->filled,
4a0a6ac0 134 source->size - source->filled);
851d4e2a 135 if (n < 0) {
ff55c3c7 136 if (errno != EAGAIN)
39d0fd9c
ZJS
137 log_error_errno(errno, "read(%d, ..., %zu): %m",
138 source->fd,
1fa2f38f 139 source->size - source->filled);
851d4e2a
ZJS
140 return -errno;
141 } else if (n == 0)
142 return 0;
fdfccdbc 143
851d4e2a
ZJS
144 source->filled += n;
145 }
fdfccdbc 146
92b10cbc
ZJS
147 *line = source->buf + source->offset;
148 *size = c + 1 - source->buf - source->offset;
149 source->offset += *size;
fdfccdbc
ZJS
150
151 return 1;
152}
153
cc64d017
ZJS
154int push_data(RemoteSource *source, const char *data, size_t size) {
155 assert(source);
156 assert(source->state != STATE_EOF);
157
92b10cbc 158 if (!realloc_buffer(source, source->filled + size)) {
851d4e2a
ZJS
159 log_error("Failed to store received data of size %zu "
160 "(in addition to existing %zu bytes with %zu filled): %s",
161 size, source->size, source->filled, strerror(ENOMEM));
162 return -ENOMEM;
163 }
cc64d017
ZJS
164
165 memcpy(source->buf + source->filled, data, size);
166 source->filled += size;
167
168 return 0;
169}
170
fdfccdbc 171static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
fdfccdbc
ZJS
172
173 assert(source);
174 assert(source->state == STATE_DATA_START ||
175 source->state == STATE_DATA ||
176 source->state == STATE_DATA_FINISH);
177 assert(size <= DATA_SIZE_MAX);
92b10cbc 178 assert(source->offset <= source->filled);
fdfccdbc
ZJS
179 assert(source->filled <= source->size);
180 assert(source->buf != NULL || source->size == 0);
181 assert(source->buf == NULL || source->size > 0);
9ff48d09 182 assert(source->fd >= 0);
fdfccdbc
ZJS
183 assert(data);
184
92b10cbc
ZJS
185 while (source->filled - source->offset < size) {
186 int n;
187
9ff48d09 188 if (source->passive_fd)
9786767a 189 /* we have to wait for some data to come to us */
ff55c3c7 190 return -EAGAIN;
9786767a 191
92b10cbc 192 if (!realloc_buffer(source, source->offset + size))
fdfccdbc
ZJS
193 return log_oom();
194
195 n = read(source->fd, source->buf + source->filled,
196 source->size - source->filled);
197 if (n < 0) {
ff55c3c7 198 if (errno != EAGAIN)
1fa2f38f
ZJS
199 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
200 source->size - source->filled);
fdfccdbc
ZJS
201 return -errno;
202 } else if (n == 0)
203 return 0;
204
205 source->filled += n;
206 }
207
92b10cbc
ZJS
208 *data = source->buf + source->offset;
209 source->offset += size;
fdfccdbc
ZJS
210
211 return 1;
212}
213
214static int get_data_size(RemoteSource *source) {
215 int r;
92b10cbc 216 void *data;
fdfccdbc
ZJS
217
218 assert(source);
219 assert(source->state == STATE_DATA_START);
220 assert(source->data_size == 0);
221
222 r = fill_fixed_size(source, &data, sizeof(uint64_t));
223 if (r <= 0)
224 return r;
225
226 source->data_size = le64toh( *(uint64_t *) data );
227 if (source->data_size > DATA_SIZE_MAX) {
a83f4037 228 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
fdfccdbc
ZJS
229 source->data_size, DATA_SIZE_MAX);
230 return -EINVAL;
231 }
232 if (source->data_size == 0)
233 log_warning("Binary field with zero length");
234
235 return 1;
236}
237
238static int get_data_data(RemoteSource *source, void **data) {
239 int r;
240
241 assert(source);
242 assert(data);
243 assert(source->state == STATE_DATA);
244
245 r = fill_fixed_size(source, data, source->data_size);
246 if (r <= 0)
247 return r;
248
249 return 1;
250}
251
252static int get_data_newline(RemoteSource *source) {
253 int r;
92b10cbc 254 char *data;
fdfccdbc
ZJS
255
256 assert(source);
257 assert(source->state == STATE_DATA_FINISH);
258
259 r = fill_fixed_size(source, (void**) &data, 1);
260 if (r <= 0)
261 return r;
262
263 assert(data);
264 if (*data != '\n') {
265 log_error("expected newline, got '%c'", *data);
266 return -EINVAL;
267 }
268
269 return 1;
270}
271
272static int process_dunder(RemoteSource *source, char *line, size_t n) {
273 const char *timestamp;
274 int r;
275
276 assert(line);
277 assert(n > 0);
278 assert(line[n-1] == '\n');
279
280 /* XXX: is it worth to support timestamps in extended format?
281 * We don't produce them, but who knows... */
282
283 timestamp = startswith(line, "__CURSOR=");
284 if (timestamp)
285 /* ignore __CURSOR */
286 return 1;
287
288 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
289 if (timestamp) {
290 long long unsigned x;
291 line[n-1] = '\0';
292 r = safe_atollu(timestamp, &x);
293 if (r < 0)
294 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
295 else
296 source->ts.realtime = x;
297 return r < 0 ? r : 1;
298 }
299
300 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
301 if (timestamp) {
302 long long unsigned x;
303 line[n-1] = '\0';
304 r = safe_atollu(timestamp, &x);
305 if (r < 0)
306 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
307 else
308 source->ts.monotonic = x;
309 return r < 0 ? r : 1;
310 }
311
312 timestamp = startswith(line, "__");
313 if (timestamp) {
314 log_notice("Unknown dunder line %s", line);
315 return 1;
316 }
317
318 /* no dunder */
319 return 0;
320}
321
be7e1319 322static int process_data(RemoteSource *source) {
fdfccdbc
ZJS
323 int r;
324
325 switch(source->state) {
326 case STATE_LINE: {
327 char *line, *sep;
a7f7d1bd 328 size_t n = 0;
fdfccdbc
ZJS
329
330 assert(source->data_size == 0);
331
332 r = get_line(source, &line, &n);
333 if (r < 0)
334 return r;
335 if (r == 0) {
336 source->state = STATE_EOF;
337 return r;
338 }
339 assert(n > 0);
340 assert(line[n-1] == '\n');
341
342 if (n == 1) {
cb41ff29 343 log_trace("Received empty line, event is ready");
fdfccdbc
ZJS
344 return 1;
345 }
346
347 r = process_dunder(source, line, n);
92b10cbc 348 if (r != 0)
fdfccdbc 349 return r < 0 ? r : 0;
fdfccdbc
ZJS
350
351 /* MESSAGE=xxx\n
352 or
353 COREDUMP\n
354 LLLLLLLL0011223344...\n
355 */
356 sep = memchr(line, '=', n);
09d801a8 357 if (sep) {
fdfccdbc
ZJS
358 /* chomp newline */
359 n--;
09d801a8
ZJS
360
361 r = iovw_put(&source->iovw, line, n);
362 if (r < 0)
363 return r;
364 } else {
fdfccdbc
ZJS
365 /* replace \n with = */
366 line[n-1] = '=';
fdfccdbc 367
09d801a8
ZJS
368 source->field_len = n;
369 source->state = STATE_DATA_START;
370
371 /* we cannot put the field in iovec until we have all data */
fdfccdbc
ZJS
372 }
373
09d801a8
ZJS
374 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
375
fdfccdbc
ZJS
376 return 0; /* continue */
377 }
378
379 case STATE_DATA_START:
380 assert(source->data_size == 0);
381
382 r = get_data_size(source);
dd87b184 383 // log_debug("get_data_size() -> %d", r);
fdfccdbc
ZJS
384 if (r < 0)
385 return r;
386 if (r == 0) {
387 source->state = STATE_EOF;
388 return 0;
389 }
390
391 source->state = source->data_size > 0 ?
392 STATE_DATA : STATE_DATA_FINISH;
393
394 return 0; /* continue */
395
396 case STATE_DATA: {
397 void *data;
09d801a8 398 char *field;
fdfccdbc
ZJS
399
400 assert(source->data_size > 0);
401
402 r = get_data_data(source, &data);
dd87b184 403 // log_debug("get_data_data() -> %d", r);
fdfccdbc
ZJS
404 if (r < 0)
405 return r;
406 if (r == 0) {
407 source->state = STATE_EOF;
408 return 0;
409 }
410
411 assert(data);
412
09d801a8
ZJS
413 field = (char*) data - sizeof(uint64_t) - source->field_len;
414 memmove(field + sizeof(uint64_t), field, source->field_len);
415
416 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
417 if (r < 0)
fdfccdbc 418 return r;
fdfccdbc
ZJS
419
420 source->state = STATE_DATA_FINISH;
421
422 return 0; /* continue */
423 }
424
425 case STATE_DATA_FINISH:
426 r = get_data_newline(source);
dd87b184 427 // log_debug("get_data_newline() -> %d", r);
fdfccdbc
ZJS
428 if (r < 0)
429 return r;
430 if (r == 0) {
431 source->state = STATE_EOF;
432 return 0;
433 }
434
435 source->data_size = 0;
436 source->state = STATE_LINE;
437
438 return 0; /* continue */
439 default:
440 assert_not_reached("wtf?");
441 }
442}
443
9ff48d09 444int process_source(RemoteSource *source, bool compress, bool seal) {
92b10cbc 445 size_t remain, target;
fdfccdbc
ZJS
446 int r;
447
448 assert(source);
9ff48d09 449 assert(source->writer);
fdfccdbc
ZJS
450
451 r = process_data(source);
452 if (r <= 0)
453 return r;
454
455 /* We have a full event */
0e72da6f 456 log_trace("Received full event from source@%p fd:%d (%s)",
a83f4037 457 source, source->fd, source->name);
fdfccdbc
ZJS
458
459 if (!source->iovw.count) {
460 log_warning("Entry with no payload, skipping");
461 goto freeing;
462 }
463
464 assert(source->iovw.iovec);
465 assert(source->iovw.count);
466
9ff48d09 467 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
fdfccdbc 468 if (r < 0)
c33b3297
MS
469 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
470 iovw_size(&source->iovw));
fdfccdbc
ZJS
471 else
472 r = 1;
473
474 freeing:
475 iovw_free_contents(&source->iovw);
92b10cbc
ZJS
476
477 /* possibly reset buffer position */
478 remain = source->filled - source->offset;
479
480 if (remain == 0) /* no brainer */
481 source->offset = source->scanned = source->filled = 0;
482 else if (source->offset > source->size - source->filled &&
483 source->offset > remain) {
484 memcpy(source->buf, source->buf + source->offset, remain);
485 source->offset = source->scanned = 0;
486 source->filled = remain;
487 }
488
489 target = source->size;
490 while (target > 16 * LINE_CHUNK && remain < target / 2)
491 target /= 2;
492 if (target < source->size) {
493 char *tmp;
494
495 tmp = realloc(source->buf, target);
e4c38cc3 496 if (!tmp)
92b10cbc
ZJS
497 log_warning("Failed to reallocate buffer to (smaller) size %zu",
498 target);
499 else {
500 log_debug("Reallocated buffer from %zu to %zu bytes",
501 source->size, target);
502 source->buf = tmp;
503 source->size = target;
504 }
505 }
506
fdfccdbc
ZJS
507 return r;
508}