]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
util-lib: split our string related calls from util.[ch] into its own file string...
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journald-native.h"
23 #include "string-util.h"
24 #include "journal-remote-parse.h"
25
26 #define LINE_CHUNK 8*1024u
27
28 void source_free(RemoteSource *source) {
29 if (!source)
30 return;
31
32 if (source->fd >= 0 && !source->passive_fd) {
33 log_debug("Closing fd:%d (%s)", source->fd, source->name);
34 safe_close(source->fd);
35 }
36
37 free(source->name);
38 free(source->buf);
39 iovw_free_contents(&source->iovw);
40
41 log_debug("Writer ref count %i", source->writer->n_ref);
42 writer_unref(source->writer);
43
44 sd_event_source_unref(source->event);
45 sd_event_source_unref(source->buffer_event);
46
47 free(source);
48 }
49
50 /**
51 * Initialize zero-filled source with given values. On success, takes
52 * ownerhship of fd and writer, otherwise does not touch them.
53 */
54 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
55
56 RemoteSource *source;
57
58 log_debug("Creating source for %sfd:%d (%s)",
59 passive_fd ? "passive " : "", fd, name);
60
61 assert(fd >= 0);
62
63 source = new0(RemoteSource, 1);
64 if (!source)
65 return NULL;
66
67 source->fd = fd;
68 source->passive_fd = passive_fd;
69 source->name = name;
70 source->writer = writer;
71
72 return source;
73 }
74
75 static char* realloc_buffer(RemoteSource *source, size_t size) {
76 char *b, *old = source->buf;
77
78 b = GREEDY_REALLOC(source->buf, source->size, size);
79 if (!b)
80 return NULL;
81
82 iovw_rebase(&source->iovw, old, source->buf);
83
84 return b;
85 }
86
87 static int get_line(RemoteSource *source, char **line, size_t *size) {
88 ssize_t n;
89 char *c = NULL;
90
91 assert(source);
92 assert(source->state == STATE_LINE);
93 assert(source->offset <= source->filled);
94 assert(source->filled <= source->size);
95 assert(source->buf == NULL || source->size > 0);
96 assert(source->fd >= 0);
97
98 for (;;) {
99 if (source->buf) {
100 size_t start = MAX(source->scanned, source->offset);
101
102 c = memchr(source->buf + start, '\n',
103 source->filled - start);
104 if (c != NULL)
105 break;
106 }
107
108 source->scanned = source->filled;
109 if (source->scanned >= DATA_SIZE_MAX) {
110 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
111 return -E2BIG;
112 }
113
114 if (source->passive_fd)
115 /* we have to wait for some data to come to us */
116 return -EAGAIN;
117
118 /* We know that source->filled is at most DATA_SIZE_MAX, so if
119 we reallocate it, we'll increase the size at least a bit. */
120 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
121 if (source->size - source->filled < LINE_CHUNK &&
122 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
123 return log_oom();
124
125 assert(source->buf);
126 assert(source->size - source->filled >= LINE_CHUNK ||
127 source->size == ENTRY_SIZE_MAX);
128
129 n = read(source->fd,
130 source->buf + source->filled,
131 source->size - source->filled);
132 if (n < 0) {
133 if (errno != EAGAIN)
134 log_error_errno(errno, "read(%d, ..., %zu): %m",
135 source->fd,
136 source->size - source->filled);
137 return -errno;
138 } else if (n == 0)
139 return 0;
140
141 source->filled += n;
142 }
143
144 *line = source->buf + source->offset;
145 *size = c + 1 - source->buf - source->offset;
146 source->offset += *size;
147
148 return 1;
149 }
150
151 int push_data(RemoteSource *source, const char *data, size_t size) {
152 assert(source);
153 assert(source->state != STATE_EOF);
154
155 if (!realloc_buffer(source, source->filled + size)) {
156 log_error("Failed to store received data of size %zu "
157 "(in addition to existing %zu bytes with %zu filled): %s",
158 size, source->size, source->filled, strerror(ENOMEM));
159 return -ENOMEM;
160 }
161
162 memcpy(source->buf + source->filled, data, size);
163 source->filled += size;
164
165 return 0;
166 }
167
168 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
169
170 assert(source);
171 assert(source->state == STATE_DATA_START ||
172 source->state == STATE_DATA ||
173 source->state == STATE_DATA_FINISH);
174 assert(size <= DATA_SIZE_MAX);
175 assert(source->offset <= source->filled);
176 assert(source->filled <= source->size);
177 assert(source->buf != NULL || source->size == 0);
178 assert(source->buf == NULL || source->size > 0);
179 assert(source->fd >= 0);
180 assert(data);
181
182 while (source->filled - source->offset < size) {
183 int n;
184
185 if (source->passive_fd)
186 /* we have to wait for some data to come to us */
187 return -EAGAIN;
188
189 if (!realloc_buffer(source, source->offset + size))
190 return log_oom();
191
192 n = read(source->fd, source->buf + source->filled,
193 source->size - source->filled);
194 if (n < 0) {
195 if (errno != EAGAIN)
196 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
197 source->size - source->filled);
198 return -errno;
199 } else if (n == 0)
200 return 0;
201
202 source->filled += n;
203 }
204
205 *data = source->buf + source->offset;
206 source->offset += size;
207
208 return 1;
209 }
210
211 static int get_data_size(RemoteSource *source) {
212 int r;
213 void *data;
214
215 assert(source);
216 assert(source->state == STATE_DATA_START);
217 assert(source->data_size == 0);
218
219 r = fill_fixed_size(source, &data, sizeof(uint64_t));
220 if (r <= 0)
221 return r;
222
223 source->data_size = le64toh( *(uint64_t *) data );
224 if (source->data_size > DATA_SIZE_MAX) {
225 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
226 source->data_size, DATA_SIZE_MAX);
227 return -EINVAL;
228 }
229 if (source->data_size == 0)
230 log_warning("Binary field with zero length");
231
232 return 1;
233 }
234
235 static int get_data_data(RemoteSource *source, void **data) {
236 int r;
237
238 assert(source);
239 assert(data);
240 assert(source->state == STATE_DATA);
241
242 r = fill_fixed_size(source, data, source->data_size);
243 if (r <= 0)
244 return r;
245
246 return 1;
247 }
248
249 static int get_data_newline(RemoteSource *source) {
250 int r;
251 char *data;
252
253 assert(source);
254 assert(source->state == STATE_DATA_FINISH);
255
256 r = fill_fixed_size(source, (void**) &data, 1);
257 if (r <= 0)
258 return r;
259
260 assert(data);
261 if (*data != '\n') {
262 log_error("expected newline, got '%c'", *data);
263 return -EINVAL;
264 }
265
266 return 1;
267 }
268
269 static int process_dunder(RemoteSource *source, char *line, size_t n) {
270 const char *timestamp;
271 int r;
272
273 assert(line);
274 assert(n > 0);
275 assert(line[n-1] == '\n');
276
277 /* XXX: is it worth to support timestamps in extended format?
278 * We don't produce them, but who knows... */
279
280 timestamp = startswith(line, "__CURSOR=");
281 if (timestamp)
282 /* ignore __CURSOR */
283 return 1;
284
285 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
286 if (timestamp) {
287 long long unsigned x;
288 line[n-1] = '\0';
289 r = safe_atollu(timestamp, &x);
290 if (r < 0)
291 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
292 else
293 source->ts.realtime = x;
294 return r < 0 ? r : 1;
295 }
296
297 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
298 if (timestamp) {
299 long long unsigned x;
300 line[n-1] = '\0';
301 r = safe_atollu(timestamp, &x);
302 if (r < 0)
303 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
304 else
305 source->ts.monotonic = x;
306 return r < 0 ? r : 1;
307 }
308
309 timestamp = startswith(line, "__");
310 if (timestamp) {
311 log_notice("Unknown dunder line %s", line);
312 return 1;
313 }
314
315 /* no dunder */
316 return 0;
317 }
318
319 static int process_data(RemoteSource *source) {
320 int r;
321
322 switch(source->state) {
323 case STATE_LINE: {
324 char *line, *sep;
325 size_t n = 0;
326
327 assert(source->data_size == 0);
328
329 r = get_line(source, &line, &n);
330 if (r < 0)
331 return r;
332 if (r == 0) {
333 source->state = STATE_EOF;
334 return r;
335 }
336 assert(n > 0);
337 assert(line[n-1] == '\n');
338
339 if (n == 1) {
340 log_trace("Received empty line, event is ready");
341 return 1;
342 }
343
344 r = process_dunder(source, line, n);
345 if (r != 0)
346 return r < 0 ? r : 0;
347
348 /* MESSAGE=xxx\n
349 or
350 COREDUMP\n
351 LLLLLLLL0011223344...\n
352 */
353 sep = memchr(line, '=', n);
354 if (sep) {
355 /* chomp newline */
356 n--;
357
358 r = iovw_put(&source->iovw, line, n);
359 if (r < 0)
360 return r;
361 } else {
362 /* replace \n with = */
363 line[n-1] = '=';
364
365 source->field_len = n;
366 source->state = STATE_DATA_START;
367
368 /* we cannot put the field in iovec until we have all data */
369 }
370
371 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
372
373 return 0; /* continue */
374 }
375
376 case STATE_DATA_START:
377 assert(source->data_size == 0);
378
379 r = get_data_size(source);
380 // log_debug("get_data_size() -> %d", r);
381 if (r < 0)
382 return r;
383 if (r == 0) {
384 source->state = STATE_EOF;
385 return 0;
386 }
387
388 source->state = source->data_size > 0 ?
389 STATE_DATA : STATE_DATA_FINISH;
390
391 return 0; /* continue */
392
393 case STATE_DATA: {
394 void *data;
395 char *field;
396
397 assert(source->data_size > 0);
398
399 r = get_data_data(source, &data);
400 // log_debug("get_data_data() -> %d", r);
401 if (r < 0)
402 return r;
403 if (r == 0) {
404 source->state = STATE_EOF;
405 return 0;
406 }
407
408 assert(data);
409
410 field = (char*) data - sizeof(uint64_t) - source->field_len;
411 memmove(field + sizeof(uint64_t), field, source->field_len);
412
413 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
414 if (r < 0)
415 return r;
416
417 source->state = STATE_DATA_FINISH;
418
419 return 0; /* continue */
420 }
421
422 case STATE_DATA_FINISH:
423 r = get_data_newline(source);
424 // log_debug("get_data_newline() -> %d", r);
425 if (r < 0)
426 return r;
427 if (r == 0) {
428 source->state = STATE_EOF;
429 return 0;
430 }
431
432 source->data_size = 0;
433 source->state = STATE_LINE;
434
435 return 0; /* continue */
436 default:
437 assert_not_reached("wtf?");
438 }
439 }
440
441 int process_source(RemoteSource *source, bool compress, bool seal) {
442 size_t remain, target;
443 int r;
444
445 assert(source);
446 assert(source->writer);
447
448 r = process_data(source);
449 if (r <= 0)
450 return r;
451
452 /* We have a full event */
453 log_trace("Received full event from source@%p fd:%d (%s)",
454 source, source->fd, source->name);
455
456 if (!source->iovw.count) {
457 log_warning("Entry with no payload, skipping");
458 goto freeing;
459 }
460
461 assert(source->iovw.iovec);
462 assert(source->iovw.count);
463
464 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
465 if (r < 0)
466 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
467 iovw_size(&source->iovw));
468 else
469 r = 1;
470
471 freeing:
472 iovw_free_contents(&source->iovw);
473
474 /* possibly reset buffer position */
475 remain = source->filled - source->offset;
476
477 if (remain == 0) /* no brainer */
478 source->offset = source->scanned = source->filled = 0;
479 else if (source->offset > source->size - source->filled &&
480 source->offset > remain) {
481 memcpy(source->buf, source->buf + source->offset, remain);
482 source->offset = source->scanned = 0;
483 source->filled = remain;
484 }
485
486 target = source->size;
487 while (target > 16 * LINE_CHUNK && remain < target / 2)
488 target /= 2;
489 if (target < source->size) {
490 char *tmp;
491
492 tmp = realloc(source->buf, target);
493 if (!tmp)
494 log_warning("Failed to reallocate buffer to (smaller) size %zu",
495 target);
496 else {
497 log_debug("Reallocated buffer from %zu to %zu bytes",
498 source->size, target);
499 source->buf = tmp;
500 source->size = target;
501 }
502 }
503
504 return r;
505 }