]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-remote-parse.c
Merge pull request #1668 from ssahani/net1
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
fdfccdbc
ZJS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
3ffd4af2
LP
22#include "fd-util.h"
23#include "journal-remote-parse.h"
fdfccdbc 24#include "journald-native.h"
07630cea 25#include "string-util.h"
fdfccdbc 26
4a0a6ac0 27#define LINE_CHUNK 8*1024u
fdfccdbc
ZJS
28
29void source_free(RemoteSource *source) {
30 if (!source)
31 return;
32
9ff48d09 33 if (source->fd >= 0 && !source->passive_fd) {
fdfccdbc 34 log_debug("Closing fd:%d (%s)", source->fd, source->name);
8201af08 35 safe_close(source->fd);
fdfccdbc 36 }
9ff48d09 37
fdfccdbc
ZJS
38 free(source->name);
39 free(source->buf);
40 iovw_free_contents(&source->iovw);
8201af08 41
1fa2f38f 42 log_debug("Writer ref count %i", source->writer->n_ref);
9ff48d09
ZJS
43 writer_unref(source->writer);
44
8201af08 45 sd_event_source_unref(source->event);
043945b9 46 sd_event_source_unref(source->buffer_event);
8201af08 47
fdfccdbc
ZJS
48 free(source);
49}
50
9ff48d09
ZJS
51/**
52 * Initialize zero-filled source with given values. On success, takes
53 * ownerhship of fd and writer, otherwise does not touch them.
54 */
55RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
56
57 RemoteSource *source;
58
59 log_debug("Creating source for %sfd:%d (%s)",
60 passive_fd ? "passive " : "", fd, name);
61
62 assert(fd >= 0);
63
64 source = new0(RemoteSource, 1);
65 if (!source)
66 return NULL;
67
68 source->fd = fd;
69 source->passive_fd = passive_fd;
70 source->name = name;
71 source->writer = writer;
72
73 return source;
74}
75
92b10cbc
ZJS
76static char* realloc_buffer(RemoteSource *source, size_t size) {
77 char *b, *old = source->buf;
78
79 b = GREEDY_REALLOC(source->buf, source->size, size);
80 if (!b)
81 return NULL;
82
83 iovw_rebase(&source->iovw, old, source->buf);
84
85 return b;
86}
87
fdfccdbc 88static int get_line(RemoteSource *source, char **line, size_t *size) {
92b10cbc 89 ssize_t n;
60921179 90 char *c = NULL;
fdfccdbc
ZJS
91
92 assert(source);
93 assert(source->state == STATE_LINE);
92b10cbc 94 assert(source->offset <= source->filled);
fdfccdbc
ZJS
95 assert(source->filled <= source->size);
96 assert(source->buf == NULL || source->size > 0);
9ff48d09 97 assert(source->fd >= 0);
fdfccdbc 98
57255510 99 for (;;) {
92b10cbc
ZJS
100 if (source->buf) {
101 size_t start = MAX(source->scanned, source->offset);
102
103 c = memchr(source->buf + start, '\n',
104 source->filled - start);
105 if (c != NULL)
106 break;
107 }
851d4e2a
ZJS
108
109 source->scanned = source->filled;
110 if (source->scanned >= DATA_SIZE_MAX) {
111 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
112 return -E2BIG;
113 }
60921179 114
9ff48d09 115 if (source->passive_fd)
851d4e2a 116 /* we have to wait for some data to come to us */
ff55c3c7 117 return -EAGAIN;
fdfccdbc 118
39d0fd9c
ZJS
119 /* We know that source->filled is at most DATA_SIZE_MAX, so if
120 we reallocate it, we'll increase the size at least a bit. */
121 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
851d4e2a 122 if (source->size - source->filled < LINE_CHUNK &&
39d0fd9c 123 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
851d4e2a 124 return log_oom();
9786767a 125
39d0fd9c 126 assert(source->buf);
4a0a6ac0 127 assert(source->size - source->filled >= LINE_CHUNK ||
92b10cbc 128 source->size == ENTRY_SIZE_MAX);
fdfccdbc 129
39d0fd9c
ZJS
130 n = read(source->fd,
131 source->buf + source->filled,
4a0a6ac0 132 source->size - source->filled);
851d4e2a 133 if (n < 0) {
ff55c3c7 134 if (errno != EAGAIN)
39d0fd9c
ZJS
135 log_error_errno(errno, "read(%d, ..., %zu): %m",
136 source->fd,
1fa2f38f 137 source->size - source->filled);
851d4e2a
ZJS
138 return -errno;
139 } else if (n == 0)
140 return 0;
fdfccdbc 141
851d4e2a
ZJS
142 source->filled += n;
143 }
fdfccdbc 144
92b10cbc
ZJS
145 *line = source->buf + source->offset;
146 *size = c + 1 - source->buf - source->offset;
147 source->offset += *size;
fdfccdbc
ZJS
148
149 return 1;
150}
151
cc64d017
ZJS
152int push_data(RemoteSource *source, const char *data, size_t size) {
153 assert(source);
154 assert(source->state != STATE_EOF);
155
92b10cbc 156 if (!realloc_buffer(source, source->filled + size)) {
851d4e2a
ZJS
157 log_error("Failed to store received data of size %zu "
158 "(in addition to existing %zu bytes with %zu filled): %s",
159 size, source->size, source->filled, strerror(ENOMEM));
160 return -ENOMEM;
161 }
cc64d017
ZJS
162
163 memcpy(source->buf + source->filled, data, size);
164 source->filled += size;
165
166 return 0;
167}
168
fdfccdbc 169static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
fdfccdbc
ZJS
170
171 assert(source);
172 assert(source->state == STATE_DATA_START ||
173 source->state == STATE_DATA ||
174 source->state == STATE_DATA_FINISH);
175 assert(size <= DATA_SIZE_MAX);
92b10cbc 176 assert(source->offset <= source->filled);
fdfccdbc
ZJS
177 assert(source->filled <= source->size);
178 assert(source->buf != NULL || source->size == 0);
179 assert(source->buf == NULL || source->size > 0);
9ff48d09 180 assert(source->fd >= 0);
fdfccdbc
ZJS
181 assert(data);
182
92b10cbc
ZJS
183 while (source->filled - source->offset < size) {
184 int n;
185
9ff48d09 186 if (source->passive_fd)
9786767a 187 /* we have to wait for some data to come to us */
ff55c3c7 188 return -EAGAIN;
9786767a 189
92b10cbc 190 if (!realloc_buffer(source, source->offset + size))
fdfccdbc
ZJS
191 return log_oom();
192
193 n = read(source->fd, source->buf + source->filled,
194 source->size - source->filled);
195 if (n < 0) {
ff55c3c7 196 if (errno != EAGAIN)
1fa2f38f
ZJS
197 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
198 source->size - source->filled);
fdfccdbc
ZJS
199 return -errno;
200 } else if (n == 0)
201 return 0;
202
203 source->filled += n;
204 }
205
92b10cbc
ZJS
206 *data = source->buf + source->offset;
207 source->offset += size;
fdfccdbc
ZJS
208
209 return 1;
210}
211
212static int get_data_size(RemoteSource *source) {
213 int r;
92b10cbc 214 void *data;
fdfccdbc
ZJS
215
216 assert(source);
217 assert(source->state == STATE_DATA_START);
218 assert(source->data_size == 0);
219
220 r = fill_fixed_size(source, &data, sizeof(uint64_t));
221 if (r <= 0)
222 return r;
223
224 source->data_size = le64toh( *(uint64_t *) data );
225 if (source->data_size > DATA_SIZE_MAX) {
a83f4037 226 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
fdfccdbc
ZJS
227 source->data_size, DATA_SIZE_MAX);
228 return -EINVAL;
229 }
230 if (source->data_size == 0)
231 log_warning("Binary field with zero length");
232
233 return 1;
234}
235
236static int get_data_data(RemoteSource *source, void **data) {
237 int r;
238
239 assert(source);
240 assert(data);
241 assert(source->state == STATE_DATA);
242
243 r = fill_fixed_size(source, data, source->data_size);
244 if (r <= 0)
245 return r;
246
247 return 1;
248}
249
250static int get_data_newline(RemoteSource *source) {
251 int r;
92b10cbc 252 char *data;
fdfccdbc
ZJS
253
254 assert(source);
255 assert(source->state == STATE_DATA_FINISH);
256
257 r = fill_fixed_size(source, (void**) &data, 1);
258 if (r <= 0)
259 return r;
260
261 assert(data);
262 if (*data != '\n') {
263 log_error("expected newline, got '%c'", *data);
264 return -EINVAL;
265 }
266
267 return 1;
268}
269
270static int process_dunder(RemoteSource *source, char *line, size_t n) {
271 const char *timestamp;
272 int r;
273
274 assert(line);
275 assert(n > 0);
276 assert(line[n-1] == '\n');
277
278 /* XXX: is it worth to support timestamps in extended format?
279 * We don't produce them, but who knows... */
280
281 timestamp = startswith(line, "__CURSOR=");
282 if (timestamp)
283 /* ignore __CURSOR */
284 return 1;
285
286 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
287 if (timestamp) {
288 long long unsigned x;
289 line[n-1] = '\0';
290 r = safe_atollu(timestamp, &x);
291 if (r < 0)
292 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
293 else
294 source->ts.realtime = x;
295 return r < 0 ? r : 1;
296 }
297
298 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
299 if (timestamp) {
300 long long unsigned x;
301 line[n-1] = '\0';
302 r = safe_atollu(timestamp, &x);
303 if (r < 0)
304 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
305 else
306 source->ts.monotonic = x;
307 return r < 0 ? r : 1;
308 }
309
310 timestamp = startswith(line, "__");
311 if (timestamp) {
312 log_notice("Unknown dunder line %s", line);
313 return 1;
314 }
315
316 /* no dunder */
317 return 0;
318}
319
be7e1319 320static int process_data(RemoteSource *source) {
fdfccdbc
ZJS
321 int r;
322
323 switch(source->state) {
324 case STATE_LINE: {
325 char *line, *sep;
a7f7d1bd 326 size_t n = 0;
fdfccdbc
ZJS
327
328 assert(source->data_size == 0);
329
330 r = get_line(source, &line, &n);
331 if (r < 0)
332 return r;
333 if (r == 0) {
334 source->state = STATE_EOF;
335 return r;
336 }
337 assert(n > 0);
338 assert(line[n-1] == '\n');
339
340 if (n == 1) {
cb41ff29 341 log_trace("Received empty line, event is ready");
fdfccdbc
ZJS
342 return 1;
343 }
344
345 r = process_dunder(source, line, n);
92b10cbc 346 if (r != 0)
fdfccdbc 347 return r < 0 ? r : 0;
fdfccdbc
ZJS
348
349 /* MESSAGE=xxx\n
350 or
351 COREDUMP\n
352 LLLLLLLL0011223344...\n
353 */
354 sep = memchr(line, '=', n);
09d801a8 355 if (sep) {
fdfccdbc
ZJS
356 /* chomp newline */
357 n--;
09d801a8
ZJS
358
359 r = iovw_put(&source->iovw, line, n);
360 if (r < 0)
361 return r;
362 } else {
fdfccdbc
ZJS
363 /* replace \n with = */
364 line[n-1] = '=';
fdfccdbc 365
09d801a8
ZJS
366 source->field_len = n;
367 source->state = STATE_DATA_START;
368
369 /* we cannot put the field in iovec until we have all data */
fdfccdbc
ZJS
370 }
371
09d801a8
ZJS
372 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
373
fdfccdbc
ZJS
374 return 0; /* continue */
375 }
376
377 case STATE_DATA_START:
378 assert(source->data_size == 0);
379
380 r = get_data_size(source);
dd87b184 381 // log_debug("get_data_size() -> %d", r);
fdfccdbc
ZJS
382 if (r < 0)
383 return r;
384 if (r == 0) {
385 source->state = STATE_EOF;
386 return 0;
387 }
388
389 source->state = source->data_size > 0 ?
390 STATE_DATA : STATE_DATA_FINISH;
391
392 return 0; /* continue */
393
394 case STATE_DATA: {
395 void *data;
09d801a8 396 char *field;
fdfccdbc
ZJS
397
398 assert(source->data_size > 0);
399
400 r = get_data_data(source, &data);
dd87b184 401 // log_debug("get_data_data() -> %d", r);
fdfccdbc
ZJS
402 if (r < 0)
403 return r;
404 if (r == 0) {
405 source->state = STATE_EOF;
406 return 0;
407 }
408
409 assert(data);
410
09d801a8
ZJS
411 field = (char*) data - sizeof(uint64_t) - source->field_len;
412 memmove(field + sizeof(uint64_t), field, source->field_len);
413
414 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
415 if (r < 0)
fdfccdbc 416 return r;
fdfccdbc
ZJS
417
418 source->state = STATE_DATA_FINISH;
419
420 return 0; /* continue */
421 }
422
423 case STATE_DATA_FINISH:
424 r = get_data_newline(source);
dd87b184 425 // log_debug("get_data_newline() -> %d", r);
fdfccdbc
ZJS
426 if (r < 0)
427 return r;
428 if (r == 0) {
429 source->state = STATE_EOF;
430 return 0;
431 }
432
433 source->data_size = 0;
434 source->state = STATE_LINE;
435
436 return 0; /* continue */
437 default:
438 assert_not_reached("wtf?");
439 }
440}
441
9ff48d09 442int process_source(RemoteSource *source, bool compress, bool seal) {
92b10cbc 443 size_t remain, target;
fdfccdbc
ZJS
444 int r;
445
446 assert(source);
9ff48d09 447 assert(source->writer);
fdfccdbc
ZJS
448
449 r = process_data(source);
450 if (r <= 0)
451 return r;
452
453 /* We have a full event */
0e72da6f 454 log_trace("Received full event from source@%p fd:%d (%s)",
a83f4037 455 source, source->fd, source->name);
fdfccdbc
ZJS
456
457 if (!source->iovw.count) {
458 log_warning("Entry with no payload, skipping");
459 goto freeing;
460 }
461
462 assert(source->iovw.iovec);
463 assert(source->iovw.count);
464
9ff48d09 465 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
fdfccdbc 466 if (r < 0)
c33b3297
MS
467 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
468 iovw_size(&source->iovw));
fdfccdbc
ZJS
469 else
470 r = 1;
471
472 freeing:
473 iovw_free_contents(&source->iovw);
92b10cbc
ZJS
474
475 /* possibly reset buffer position */
476 remain = source->filled - source->offset;
477
478 if (remain == 0) /* no brainer */
479 source->offset = source->scanned = source->filled = 0;
480 else if (source->offset > source->size - source->filled &&
481 source->offset > remain) {
482 memcpy(source->buf, source->buf + source->offset, remain);
483 source->offset = source->scanned = 0;
484 source->filled = remain;
485 }
486
487 target = source->size;
488 while (target > 16 * LINE_CHUNK && remain < target / 2)
489 target /= 2;
490 if (target < source->size) {
491 char *tmp;
492
493 tmp = realloc(source->buf, target);
e4c38cc3 494 if (!tmp)
92b10cbc
ZJS
495 log_warning("Failed to reallocate buffer to (smaller) size %zu",
496 target);
497 else {
498 log_debug("Reallocated buffer from %zu to %zu bytes",
499 source->size, target);
500 source->buf = tmp;
501 source->size = target;
502 }
503 }
504
fdfccdbc
ZJS
505 return r;
506}