]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-remote-parse.c
util-lib: split string parsing related calls from util.[ch] into parse-util.[ch]
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
fdfccdbc
ZJS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
3ffd4af2
LP
22#include "fd-util.h"
23#include "journal-remote-parse.h"
fdfccdbc 24#include "journald-native.h"
6bedfcbb 25#include "parse-util.h"
07630cea 26#include "string-util.h"
fdfccdbc 27
4a0a6ac0 28#define LINE_CHUNK 8*1024u
fdfccdbc
ZJS
29
30void source_free(RemoteSource *source) {
31 if (!source)
32 return;
33
9ff48d09 34 if (source->fd >= 0 && !source->passive_fd) {
fdfccdbc 35 log_debug("Closing fd:%d (%s)", source->fd, source->name);
8201af08 36 safe_close(source->fd);
fdfccdbc 37 }
9ff48d09 38
fdfccdbc
ZJS
39 free(source->name);
40 free(source->buf);
41 iovw_free_contents(&source->iovw);
8201af08 42
1fa2f38f 43 log_debug("Writer ref count %i", source->writer->n_ref);
9ff48d09
ZJS
44 writer_unref(source->writer);
45
8201af08 46 sd_event_source_unref(source->event);
043945b9 47 sd_event_source_unref(source->buffer_event);
8201af08 48
fdfccdbc
ZJS
49 free(source);
50}
51
9ff48d09
ZJS
52/**
53 * Initialize zero-filled source with given values. On success, takes
54 * ownerhship of fd and writer, otherwise does not touch them.
55 */
56RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
57
58 RemoteSource *source;
59
60 log_debug("Creating source for %sfd:%d (%s)",
61 passive_fd ? "passive " : "", fd, name);
62
63 assert(fd >= 0);
64
65 source = new0(RemoteSource, 1);
66 if (!source)
67 return NULL;
68
69 source->fd = fd;
70 source->passive_fd = passive_fd;
71 source->name = name;
72 source->writer = writer;
73
74 return source;
75}
76
92b10cbc
ZJS
77static char* realloc_buffer(RemoteSource *source, size_t size) {
78 char *b, *old = source->buf;
79
80 b = GREEDY_REALLOC(source->buf, source->size, size);
81 if (!b)
82 return NULL;
83
84 iovw_rebase(&source->iovw, old, source->buf);
85
86 return b;
87}
88
fdfccdbc 89static int get_line(RemoteSource *source, char **line, size_t *size) {
92b10cbc 90 ssize_t n;
60921179 91 char *c = NULL;
fdfccdbc
ZJS
92
93 assert(source);
94 assert(source->state == STATE_LINE);
92b10cbc 95 assert(source->offset <= source->filled);
fdfccdbc
ZJS
96 assert(source->filled <= source->size);
97 assert(source->buf == NULL || source->size > 0);
9ff48d09 98 assert(source->fd >= 0);
fdfccdbc 99
57255510 100 for (;;) {
92b10cbc
ZJS
101 if (source->buf) {
102 size_t start = MAX(source->scanned, source->offset);
103
104 c = memchr(source->buf + start, '\n',
105 source->filled - start);
106 if (c != NULL)
107 break;
108 }
851d4e2a
ZJS
109
110 source->scanned = source->filled;
111 if (source->scanned >= DATA_SIZE_MAX) {
112 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
113 return -E2BIG;
114 }
60921179 115
9ff48d09 116 if (source->passive_fd)
851d4e2a 117 /* we have to wait for some data to come to us */
ff55c3c7 118 return -EAGAIN;
fdfccdbc 119
39d0fd9c
ZJS
120 /* We know that source->filled is at most DATA_SIZE_MAX, so if
121 we reallocate it, we'll increase the size at least a bit. */
122 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
851d4e2a 123 if (source->size - source->filled < LINE_CHUNK &&
39d0fd9c 124 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
851d4e2a 125 return log_oom();
9786767a 126
39d0fd9c 127 assert(source->buf);
4a0a6ac0 128 assert(source->size - source->filled >= LINE_CHUNK ||
92b10cbc 129 source->size == ENTRY_SIZE_MAX);
fdfccdbc 130
39d0fd9c
ZJS
131 n = read(source->fd,
132 source->buf + source->filled,
4a0a6ac0 133 source->size - source->filled);
851d4e2a 134 if (n < 0) {
ff55c3c7 135 if (errno != EAGAIN)
39d0fd9c
ZJS
136 log_error_errno(errno, "read(%d, ..., %zu): %m",
137 source->fd,
1fa2f38f 138 source->size - source->filled);
851d4e2a
ZJS
139 return -errno;
140 } else if (n == 0)
141 return 0;
fdfccdbc 142
851d4e2a
ZJS
143 source->filled += n;
144 }
fdfccdbc 145
92b10cbc
ZJS
146 *line = source->buf + source->offset;
147 *size = c + 1 - source->buf - source->offset;
148 source->offset += *size;
fdfccdbc
ZJS
149
150 return 1;
151}
152
cc64d017
ZJS
153int push_data(RemoteSource *source, const char *data, size_t size) {
154 assert(source);
155 assert(source->state != STATE_EOF);
156
92b10cbc 157 if (!realloc_buffer(source, source->filled + size)) {
851d4e2a
ZJS
158 log_error("Failed to store received data of size %zu "
159 "(in addition to existing %zu bytes with %zu filled): %s",
160 size, source->size, source->filled, strerror(ENOMEM));
161 return -ENOMEM;
162 }
cc64d017
ZJS
163
164 memcpy(source->buf + source->filled, data, size);
165 source->filled += size;
166
167 return 0;
168}
169
fdfccdbc 170static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
fdfccdbc
ZJS
171
172 assert(source);
173 assert(source->state == STATE_DATA_START ||
174 source->state == STATE_DATA ||
175 source->state == STATE_DATA_FINISH);
176 assert(size <= DATA_SIZE_MAX);
92b10cbc 177 assert(source->offset <= source->filled);
fdfccdbc
ZJS
178 assert(source->filled <= source->size);
179 assert(source->buf != NULL || source->size == 0);
180 assert(source->buf == NULL || source->size > 0);
9ff48d09 181 assert(source->fd >= 0);
fdfccdbc
ZJS
182 assert(data);
183
92b10cbc
ZJS
184 while (source->filled - source->offset < size) {
185 int n;
186
9ff48d09 187 if (source->passive_fd)
9786767a 188 /* we have to wait for some data to come to us */
ff55c3c7 189 return -EAGAIN;
9786767a 190
92b10cbc 191 if (!realloc_buffer(source, source->offset + size))
fdfccdbc
ZJS
192 return log_oom();
193
194 n = read(source->fd, source->buf + source->filled,
195 source->size - source->filled);
196 if (n < 0) {
ff55c3c7 197 if (errno != EAGAIN)
1fa2f38f
ZJS
198 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
199 source->size - source->filled);
fdfccdbc
ZJS
200 return -errno;
201 } else if (n == 0)
202 return 0;
203
204 source->filled += n;
205 }
206
92b10cbc
ZJS
207 *data = source->buf + source->offset;
208 source->offset += size;
fdfccdbc
ZJS
209
210 return 1;
211}
212
213static int get_data_size(RemoteSource *source) {
214 int r;
92b10cbc 215 void *data;
fdfccdbc
ZJS
216
217 assert(source);
218 assert(source->state == STATE_DATA_START);
219 assert(source->data_size == 0);
220
221 r = fill_fixed_size(source, &data, sizeof(uint64_t));
222 if (r <= 0)
223 return r;
224
225 source->data_size = le64toh( *(uint64_t *) data );
226 if (source->data_size > DATA_SIZE_MAX) {
a83f4037 227 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
fdfccdbc
ZJS
228 source->data_size, DATA_SIZE_MAX);
229 return -EINVAL;
230 }
231 if (source->data_size == 0)
232 log_warning("Binary field with zero length");
233
234 return 1;
235}
236
237static int get_data_data(RemoteSource *source, void **data) {
238 int r;
239
240 assert(source);
241 assert(data);
242 assert(source->state == STATE_DATA);
243
244 r = fill_fixed_size(source, data, source->data_size);
245 if (r <= 0)
246 return r;
247
248 return 1;
249}
250
251static int get_data_newline(RemoteSource *source) {
252 int r;
92b10cbc 253 char *data;
fdfccdbc
ZJS
254
255 assert(source);
256 assert(source->state == STATE_DATA_FINISH);
257
258 r = fill_fixed_size(source, (void**) &data, 1);
259 if (r <= 0)
260 return r;
261
262 assert(data);
263 if (*data != '\n') {
264 log_error("expected newline, got '%c'", *data);
265 return -EINVAL;
266 }
267
268 return 1;
269}
270
271static int process_dunder(RemoteSource *source, char *line, size_t n) {
272 const char *timestamp;
273 int r;
274
275 assert(line);
276 assert(n > 0);
277 assert(line[n-1] == '\n');
278
279 /* XXX: is it worth to support timestamps in extended format?
280 * We don't produce them, but who knows... */
281
282 timestamp = startswith(line, "__CURSOR=");
283 if (timestamp)
284 /* ignore __CURSOR */
285 return 1;
286
287 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
288 if (timestamp) {
289 long long unsigned x;
290 line[n-1] = '\0';
291 r = safe_atollu(timestamp, &x);
292 if (r < 0)
293 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
294 else
295 source->ts.realtime = x;
296 return r < 0 ? r : 1;
297 }
298
299 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
300 if (timestamp) {
301 long long unsigned x;
302 line[n-1] = '\0';
303 r = safe_atollu(timestamp, &x);
304 if (r < 0)
305 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
306 else
307 source->ts.monotonic = x;
308 return r < 0 ? r : 1;
309 }
310
311 timestamp = startswith(line, "__");
312 if (timestamp) {
313 log_notice("Unknown dunder line %s", line);
314 return 1;
315 }
316
317 /* no dunder */
318 return 0;
319}
320
be7e1319 321static int process_data(RemoteSource *source) {
fdfccdbc
ZJS
322 int r;
323
324 switch(source->state) {
325 case STATE_LINE: {
326 char *line, *sep;
a7f7d1bd 327 size_t n = 0;
fdfccdbc
ZJS
328
329 assert(source->data_size == 0);
330
331 r = get_line(source, &line, &n);
332 if (r < 0)
333 return r;
334 if (r == 0) {
335 source->state = STATE_EOF;
336 return r;
337 }
338 assert(n > 0);
339 assert(line[n-1] == '\n');
340
341 if (n == 1) {
cb41ff29 342 log_trace("Received empty line, event is ready");
fdfccdbc
ZJS
343 return 1;
344 }
345
346 r = process_dunder(source, line, n);
92b10cbc 347 if (r != 0)
fdfccdbc 348 return r < 0 ? r : 0;
fdfccdbc
ZJS
349
350 /* MESSAGE=xxx\n
351 or
352 COREDUMP\n
353 LLLLLLLL0011223344...\n
354 */
355 sep = memchr(line, '=', n);
09d801a8 356 if (sep) {
fdfccdbc
ZJS
357 /* chomp newline */
358 n--;
09d801a8
ZJS
359
360 r = iovw_put(&source->iovw, line, n);
361 if (r < 0)
362 return r;
363 } else {
fdfccdbc
ZJS
364 /* replace \n with = */
365 line[n-1] = '=';
fdfccdbc 366
09d801a8
ZJS
367 source->field_len = n;
368 source->state = STATE_DATA_START;
369
370 /* we cannot put the field in iovec until we have all data */
fdfccdbc
ZJS
371 }
372
09d801a8
ZJS
373 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
374
fdfccdbc
ZJS
375 return 0; /* continue */
376 }
377
378 case STATE_DATA_START:
379 assert(source->data_size == 0);
380
381 r = get_data_size(source);
dd87b184 382 // log_debug("get_data_size() -> %d", r);
fdfccdbc
ZJS
383 if (r < 0)
384 return r;
385 if (r == 0) {
386 source->state = STATE_EOF;
387 return 0;
388 }
389
390 source->state = source->data_size > 0 ?
391 STATE_DATA : STATE_DATA_FINISH;
392
393 return 0; /* continue */
394
395 case STATE_DATA: {
396 void *data;
09d801a8 397 char *field;
fdfccdbc
ZJS
398
399 assert(source->data_size > 0);
400
401 r = get_data_data(source, &data);
dd87b184 402 // log_debug("get_data_data() -> %d", r);
fdfccdbc
ZJS
403 if (r < 0)
404 return r;
405 if (r == 0) {
406 source->state = STATE_EOF;
407 return 0;
408 }
409
410 assert(data);
411
09d801a8
ZJS
412 field = (char*) data - sizeof(uint64_t) - source->field_len;
413 memmove(field + sizeof(uint64_t), field, source->field_len);
414
415 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
416 if (r < 0)
fdfccdbc 417 return r;
fdfccdbc
ZJS
418
419 source->state = STATE_DATA_FINISH;
420
421 return 0; /* continue */
422 }
423
424 case STATE_DATA_FINISH:
425 r = get_data_newline(source);
dd87b184 426 // log_debug("get_data_newline() -> %d", r);
fdfccdbc
ZJS
427 if (r < 0)
428 return r;
429 if (r == 0) {
430 source->state = STATE_EOF;
431 return 0;
432 }
433
434 source->data_size = 0;
435 source->state = STATE_LINE;
436
437 return 0; /* continue */
438 default:
439 assert_not_reached("wtf?");
440 }
441}
442
9ff48d09 443int process_source(RemoteSource *source, bool compress, bool seal) {
92b10cbc 444 size_t remain, target;
fdfccdbc
ZJS
445 int r;
446
447 assert(source);
9ff48d09 448 assert(source->writer);
fdfccdbc
ZJS
449
450 r = process_data(source);
451 if (r <= 0)
452 return r;
453
454 /* We have a full event */
0e72da6f 455 log_trace("Received full event from source@%p fd:%d (%s)",
a83f4037 456 source, source->fd, source->name);
fdfccdbc
ZJS
457
458 if (!source->iovw.count) {
459 log_warning("Entry with no payload, skipping");
460 goto freeing;
461 }
462
463 assert(source->iovw.iovec);
464 assert(source->iovw.count);
465
9ff48d09 466 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
fdfccdbc 467 if (r < 0)
c33b3297
MS
468 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
469 iovw_size(&source->iovw));
fdfccdbc
ZJS
470 else
471 r = 1;
472
473 freeing:
474 iovw_free_contents(&source->iovw);
92b10cbc
ZJS
475
476 /* possibly reset buffer position */
477 remain = source->filled - source->offset;
478
479 if (remain == 0) /* no brainer */
480 source->offset = source->scanned = source->filled = 0;
481 else if (source->offset > source->size - source->filled &&
482 source->offset > remain) {
483 memcpy(source->buf, source->buf + source->offset, remain);
484 source->offset = source->scanned = 0;
485 source->filled = remain;
486 }
487
488 target = source->size;
489 while (target > 16 * LINE_CHUNK && remain < target / 2)
490 target /= 2;
491 if (target < source->size) {
492 char *tmp;
493
494 tmp = realloc(source->buf, target);
e4c38cc3 495 if (!tmp)
92b10cbc
ZJS
496 log_warning("Failed to reallocate buffer to (smaller) size %zu",
497 target);
498 else {
499 log_debug("Reallocated buffer from %zu to %zu bytes",
500 source->size, target);
501 source->buf = tmp;
502 source->size = target;
503 }
504 }
505
fdfccdbc
ZJS
506 return r;
507}