]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-remote-parse.c
util-lib: split our string related calls from util.[ch] into its own file string...
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
fdfccdbc
ZJS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
fdfccdbc 22#include "journald-native.h"
07630cea
LP
23#include "string-util.h"
24#include "journal-remote-parse.h"
fdfccdbc 25
4a0a6ac0 26#define LINE_CHUNK 8*1024u
fdfccdbc
ZJS
27
28void source_free(RemoteSource *source) {
29 if (!source)
30 return;
31
9ff48d09 32 if (source->fd >= 0 && !source->passive_fd) {
fdfccdbc 33 log_debug("Closing fd:%d (%s)", source->fd, source->name);
8201af08 34 safe_close(source->fd);
fdfccdbc 35 }
9ff48d09 36
fdfccdbc
ZJS
37 free(source->name);
38 free(source->buf);
39 iovw_free_contents(&source->iovw);
8201af08 40
1fa2f38f 41 log_debug("Writer ref count %i", source->writer->n_ref);
9ff48d09
ZJS
42 writer_unref(source->writer);
43
8201af08 44 sd_event_source_unref(source->event);
043945b9 45 sd_event_source_unref(source->buffer_event);
8201af08 46
fdfccdbc
ZJS
47 free(source);
48}
49
9ff48d09
ZJS
50/**
51 * Initialize zero-filled source with given values. On success, takes
52 * ownerhship of fd and writer, otherwise does not touch them.
53 */
54RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
55
56 RemoteSource *source;
57
58 log_debug("Creating source for %sfd:%d (%s)",
59 passive_fd ? "passive " : "", fd, name);
60
61 assert(fd >= 0);
62
63 source = new0(RemoteSource, 1);
64 if (!source)
65 return NULL;
66
67 source->fd = fd;
68 source->passive_fd = passive_fd;
69 source->name = name;
70 source->writer = writer;
71
72 return source;
73}
74
92b10cbc
ZJS
75static char* realloc_buffer(RemoteSource *source, size_t size) {
76 char *b, *old = source->buf;
77
78 b = GREEDY_REALLOC(source->buf, source->size, size);
79 if (!b)
80 return NULL;
81
82 iovw_rebase(&source->iovw, old, source->buf);
83
84 return b;
85}
86
fdfccdbc 87static int get_line(RemoteSource *source, char **line, size_t *size) {
92b10cbc 88 ssize_t n;
60921179 89 char *c = NULL;
fdfccdbc
ZJS
90
91 assert(source);
92 assert(source->state == STATE_LINE);
92b10cbc 93 assert(source->offset <= source->filled);
fdfccdbc
ZJS
94 assert(source->filled <= source->size);
95 assert(source->buf == NULL || source->size > 0);
9ff48d09 96 assert(source->fd >= 0);
fdfccdbc 97
57255510 98 for (;;) {
92b10cbc
ZJS
99 if (source->buf) {
100 size_t start = MAX(source->scanned, source->offset);
101
102 c = memchr(source->buf + start, '\n',
103 source->filled - start);
104 if (c != NULL)
105 break;
106 }
851d4e2a
ZJS
107
108 source->scanned = source->filled;
109 if (source->scanned >= DATA_SIZE_MAX) {
110 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
111 return -E2BIG;
112 }
60921179 113
9ff48d09 114 if (source->passive_fd)
851d4e2a 115 /* we have to wait for some data to come to us */
ff55c3c7 116 return -EAGAIN;
fdfccdbc 117
39d0fd9c
ZJS
118 /* We know that source->filled is at most DATA_SIZE_MAX, so if
119 we reallocate it, we'll increase the size at least a bit. */
120 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
851d4e2a 121 if (source->size - source->filled < LINE_CHUNK &&
39d0fd9c 122 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
851d4e2a 123 return log_oom();
9786767a 124
39d0fd9c 125 assert(source->buf);
4a0a6ac0 126 assert(source->size - source->filled >= LINE_CHUNK ||
92b10cbc 127 source->size == ENTRY_SIZE_MAX);
fdfccdbc 128
39d0fd9c
ZJS
129 n = read(source->fd,
130 source->buf + source->filled,
4a0a6ac0 131 source->size - source->filled);
851d4e2a 132 if (n < 0) {
ff55c3c7 133 if (errno != EAGAIN)
39d0fd9c
ZJS
134 log_error_errno(errno, "read(%d, ..., %zu): %m",
135 source->fd,
1fa2f38f 136 source->size - source->filled);
851d4e2a
ZJS
137 return -errno;
138 } else if (n == 0)
139 return 0;
fdfccdbc 140
851d4e2a
ZJS
141 source->filled += n;
142 }
fdfccdbc 143
92b10cbc
ZJS
144 *line = source->buf + source->offset;
145 *size = c + 1 - source->buf - source->offset;
146 source->offset += *size;
fdfccdbc
ZJS
147
148 return 1;
149}
150
cc64d017
ZJS
151int push_data(RemoteSource *source, const char *data, size_t size) {
152 assert(source);
153 assert(source->state != STATE_EOF);
154
92b10cbc 155 if (!realloc_buffer(source, source->filled + size)) {
851d4e2a
ZJS
156 log_error("Failed to store received data of size %zu "
157 "(in addition to existing %zu bytes with %zu filled): %s",
158 size, source->size, source->filled, strerror(ENOMEM));
159 return -ENOMEM;
160 }
cc64d017
ZJS
161
162 memcpy(source->buf + source->filled, data, size);
163 source->filled += size;
164
165 return 0;
166}
167
fdfccdbc 168static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
fdfccdbc
ZJS
169
170 assert(source);
171 assert(source->state == STATE_DATA_START ||
172 source->state == STATE_DATA ||
173 source->state == STATE_DATA_FINISH);
174 assert(size <= DATA_SIZE_MAX);
92b10cbc 175 assert(source->offset <= source->filled);
fdfccdbc
ZJS
176 assert(source->filled <= source->size);
177 assert(source->buf != NULL || source->size == 0);
178 assert(source->buf == NULL || source->size > 0);
9ff48d09 179 assert(source->fd >= 0);
fdfccdbc
ZJS
180 assert(data);
181
92b10cbc
ZJS
182 while (source->filled - source->offset < size) {
183 int n;
184
9ff48d09 185 if (source->passive_fd)
9786767a 186 /* we have to wait for some data to come to us */
ff55c3c7 187 return -EAGAIN;
9786767a 188
92b10cbc 189 if (!realloc_buffer(source, source->offset + size))
fdfccdbc
ZJS
190 return log_oom();
191
192 n = read(source->fd, source->buf + source->filled,
193 source->size - source->filled);
194 if (n < 0) {
ff55c3c7 195 if (errno != EAGAIN)
1fa2f38f
ZJS
196 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
197 source->size - source->filled);
fdfccdbc
ZJS
198 return -errno;
199 } else if (n == 0)
200 return 0;
201
202 source->filled += n;
203 }
204
92b10cbc
ZJS
205 *data = source->buf + source->offset;
206 source->offset += size;
fdfccdbc
ZJS
207
208 return 1;
209}
210
211static int get_data_size(RemoteSource *source) {
212 int r;
92b10cbc 213 void *data;
fdfccdbc
ZJS
214
215 assert(source);
216 assert(source->state == STATE_DATA_START);
217 assert(source->data_size == 0);
218
219 r = fill_fixed_size(source, &data, sizeof(uint64_t));
220 if (r <= 0)
221 return r;
222
223 source->data_size = le64toh( *(uint64_t *) data );
224 if (source->data_size > DATA_SIZE_MAX) {
a83f4037 225 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
fdfccdbc
ZJS
226 source->data_size, DATA_SIZE_MAX);
227 return -EINVAL;
228 }
229 if (source->data_size == 0)
230 log_warning("Binary field with zero length");
231
232 return 1;
233}
234
235static int get_data_data(RemoteSource *source, void **data) {
236 int r;
237
238 assert(source);
239 assert(data);
240 assert(source->state == STATE_DATA);
241
242 r = fill_fixed_size(source, data, source->data_size);
243 if (r <= 0)
244 return r;
245
246 return 1;
247}
248
249static int get_data_newline(RemoteSource *source) {
250 int r;
92b10cbc 251 char *data;
fdfccdbc
ZJS
252
253 assert(source);
254 assert(source->state == STATE_DATA_FINISH);
255
256 r = fill_fixed_size(source, (void**) &data, 1);
257 if (r <= 0)
258 return r;
259
260 assert(data);
261 if (*data != '\n') {
262 log_error("expected newline, got '%c'", *data);
263 return -EINVAL;
264 }
265
266 return 1;
267}
268
269static int process_dunder(RemoteSource *source, char *line, size_t n) {
270 const char *timestamp;
271 int r;
272
273 assert(line);
274 assert(n > 0);
275 assert(line[n-1] == '\n');
276
277 /* XXX: is it worth to support timestamps in extended format?
278 * We don't produce them, but who knows... */
279
280 timestamp = startswith(line, "__CURSOR=");
281 if (timestamp)
282 /* ignore __CURSOR */
283 return 1;
284
285 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
286 if (timestamp) {
287 long long unsigned x;
288 line[n-1] = '\0';
289 r = safe_atollu(timestamp, &x);
290 if (r < 0)
291 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
292 else
293 source->ts.realtime = x;
294 return r < 0 ? r : 1;
295 }
296
297 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
298 if (timestamp) {
299 long long unsigned x;
300 line[n-1] = '\0';
301 r = safe_atollu(timestamp, &x);
302 if (r < 0)
303 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
304 else
305 source->ts.monotonic = x;
306 return r < 0 ? r : 1;
307 }
308
309 timestamp = startswith(line, "__");
310 if (timestamp) {
311 log_notice("Unknown dunder line %s", line);
312 return 1;
313 }
314
315 /* no dunder */
316 return 0;
317}
318
be7e1319 319static int process_data(RemoteSource *source) {
fdfccdbc
ZJS
320 int r;
321
322 switch(source->state) {
323 case STATE_LINE: {
324 char *line, *sep;
a7f7d1bd 325 size_t n = 0;
fdfccdbc
ZJS
326
327 assert(source->data_size == 0);
328
329 r = get_line(source, &line, &n);
330 if (r < 0)
331 return r;
332 if (r == 0) {
333 source->state = STATE_EOF;
334 return r;
335 }
336 assert(n > 0);
337 assert(line[n-1] == '\n');
338
339 if (n == 1) {
cb41ff29 340 log_trace("Received empty line, event is ready");
fdfccdbc
ZJS
341 return 1;
342 }
343
344 r = process_dunder(source, line, n);
92b10cbc 345 if (r != 0)
fdfccdbc 346 return r < 0 ? r : 0;
fdfccdbc
ZJS
347
348 /* MESSAGE=xxx\n
349 or
350 COREDUMP\n
351 LLLLLLLL0011223344...\n
352 */
353 sep = memchr(line, '=', n);
09d801a8 354 if (sep) {
fdfccdbc
ZJS
355 /* chomp newline */
356 n--;
09d801a8
ZJS
357
358 r = iovw_put(&source->iovw, line, n);
359 if (r < 0)
360 return r;
361 } else {
fdfccdbc
ZJS
362 /* replace \n with = */
363 line[n-1] = '=';
fdfccdbc 364
09d801a8
ZJS
365 source->field_len = n;
366 source->state = STATE_DATA_START;
367
368 /* we cannot put the field in iovec until we have all data */
fdfccdbc
ZJS
369 }
370
09d801a8
ZJS
371 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
372
fdfccdbc
ZJS
373 return 0; /* continue */
374 }
375
376 case STATE_DATA_START:
377 assert(source->data_size == 0);
378
379 r = get_data_size(source);
dd87b184 380 // log_debug("get_data_size() -> %d", r);
fdfccdbc
ZJS
381 if (r < 0)
382 return r;
383 if (r == 0) {
384 source->state = STATE_EOF;
385 return 0;
386 }
387
388 source->state = source->data_size > 0 ?
389 STATE_DATA : STATE_DATA_FINISH;
390
391 return 0; /* continue */
392
393 case STATE_DATA: {
394 void *data;
09d801a8 395 char *field;
fdfccdbc
ZJS
396
397 assert(source->data_size > 0);
398
399 r = get_data_data(source, &data);
dd87b184 400 // log_debug("get_data_data() -> %d", r);
fdfccdbc
ZJS
401 if (r < 0)
402 return r;
403 if (r == 0) {
404 source->state = STATE_EOF;
405 return 0;
406 }
407
408 assert(data);
409
09d801a8
ZJS
410 field = (char*) data - sizeof(uint64_t) - source->field_len;
411 memmove(field + sizeof(uint64_t), field, source->field_len);
412
413 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
414 if (r < 0)
fdfccdbc 415 return r;
fdfccdbc
ZJS
416
417 source->state = STATE_DATA_FINISH;
418
419 return 0; /* continue */
420 }
421
422 case STATE_DATA_FINISH:
423 r = get_data_newline(source);
dd87b184 424 // log_debug("get_data_newline() -> %d", r);
fdfccdbc
ZJS
425 if (r < 0)
426 return r;
427 if (r == 0) {
428 source->state = STATE_EOF;
429 return 0;
430 }
431
432 source->data_size = 0;
433 source->state = STATE_LINE;
434
435 return 0; /* continue */
436 default:
437 assert_not_reached("wtf?");
438 }
439}
440
9ff48d09 441int process_source(RemoteSource *source, bool compress, bool seal) {
92b10cbc 442 size_t remain, target;
fdfccdbc
ZJS
443 int r;
444
445 assert(source);
9ff48d09 446 assert(source->writer);
fdfccdbc
ZJS
447
448 r = process_data(source);
449 if (r <= 0)
450 return r;
451
452 /* We have a full event */
0e72da6f 453 log_trace("Received full event from source@%p fd:%d (%s)",
a83f4037 454 source, source->fd, source->name);
fdfccdbc
ZJS
455
456 if (!source->iovw.count) {
457 log_warning("Entry with no payload, skipping");
458 goto freeing;
459 }
460
461 assert(source->iovw.iovec);
462 assert(source->iovw.count);
463
9ff48d09 464 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
fdfccdbc 465 if (r < 0)
c33b3297
MS
466 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
467 iovw_size(&source->iovw));
fdfccdbc
ZJS
468 else
469 r = 1;
470
471 freeing:
472 iovw_free_contents(&source->iovw);
92b10cbc
ZJS
473
474 /* possibly reset buffer position */
475 remain = source->filled - source->offset;
476
477 if (remain == 0) /* no brainer */
478 source->offset = source->scanned = source->filled = 0;
479 else if (source->offset > source->size - source->filled &&
480 source->offset > remain) {
481 memcpy(source->buf, source->buf + source->offset, remain);
482 source->offset = source->scanned = 0;
483 source->filled = remain;
484 }
485
486 target = source->size;
487 while (target > 16 * LINE_CHUNK && remain < target / 2)
488 target /= 2;
489 if (target < source->size) {
490 char *tmp;
491
492 tmp = realloc(source->buf, target);
e4c38cc3 493 if (!tmp)
92b10cbc
ZJS
494 log_warning("Failed to reallocate buffer to (smaller) size %zu",
495 target);
496 else {
497 log_debug("Reallocated buffer from %zu to %zu bytes",
498 source->size, target);
499 source->buf = tmp;
500 source->size = target;
501 }
502 }
503
fdfccdbc
ZJS
504 return r;
505}