]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
util-lib: split string parsing related calls from util.[ch] into parse-util.[ch]
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "fd-util.h"
23 #include "journal-remote-parse.h"
24 #include "journald-native.h"
25 #include "parse-util.h"
26 #include "string-util.h"
27
28 #define LINE_CHUNK 8*1024u
29
30 void source_free(RemoteSource *source) {
31 if (!source)
32 return;
33
34 if (source->fd >= 0 && !source->passive_fd) {
35 log_debug("Closing fd:%d (%s)", source->fd, source->name);
36 safe_close(source->fd);
37 }
38
39 free(source->name);
40 free(source->buf);
41 iovw_free_contents(&source->iovw);
42
43 log_debug("Writer ref count %i", source->writer->n_ref);
44 writer_unref(source->writer);
45
46 sd_event_source_unref(source->event);
47 sd_event_source_unref(source->buffer_event);
48
49 free(source);
50 }
51
52 /**
53 * Initialize zero-filled source with given values. On success, takes
54 * ownerhship of fd and writer, otherwise does not touch them.
55 */
56 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
57
58 RemoteSource *source;
59
60 log_debug("Creating source for %sfd:%d (%s)",
61 passive_fd ? "passive " : "", fd, name);
62
63 assert(fd >= 0);
64
65 source = new0(RemoteSource, 1);
66 if (!source)
67 return NULL;
68
69 source->fd = fd;
70 source->passive_fd = passive_fd;
71 source->name = name;
72 source->writer = writer;
73
74 return source;
75 }
76
77 static char* realloc_buffer(RemoteSource *source, size_t size) {
78 char *b, *old = source->buf;
79
80 b = GREEDY_REALLOC(source->buf, source->size, size);
81 if (!b)
82 return NULL;
83
84 iovw_rebase(&source->iovw, old, source->buf);
85
86 return b;
87 }
88
89 static int get_line(RemoteSource *source, char **line, size_t *size) {
90 ssize_t n;
91 char *c = NULL;
92
93 assert(source);
94 assert(source->state == STATE_LINE);
95 assert(source->offset <= source->filled);
96 assert(source->filled <= source->size);
97 assert(source->buf == NULL || source->size > 0);
98 assert(source->fd >= 0);
99
100 for (;;) {
101 if (source->buf) {
102 size_t start = MAX(source->scanned, source->offset);
103
104 c = memchr(source->buf + start, '\n',
105 source->filled - start);
106 if (c != NULL)
107 break;
108 }
109
110 source->scanned = source->filled;
111 if (source->scanned >= DATA_SIZE_MAX) {
112 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
113 return -E2BIG;
114 }
115
116 if (source->passive_fd)
117 /* we have to wait for some data to come to us */
118 return -EAGAIN;
119
120 /* We know that source->filled is at most DATA_SIZE_MAX, so if
121 we reallocate it, we'll increase the size at least a bit. */
122 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
123 if (source->size - source->filled < LINE_CHUNK &&
124 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
125 return log_oom();
126
127 assert(source->buf);
128 assert(source->size - source->filled >= LINE_CHUNK ||
129 source->size == ENTRY_SIZE_MAX);
130
131 n = read(source->fd,
132 source->buf + source->filled,
133 source->size - source->filled);
134 if (n < 0) {
135 if (errno != EAGAIN)
136 log_error_errno(errno, "read(%d, ..., %zu): %m",
137 source->fd,
138 source->size - source->filled);
139 return -errno;
140 } else if (n == 0)
141 return 0;
142
143 source->filled += n;
144 }
145
146 *line = source->buf + source->offset;
147 *size = c + 1 - source->buf - source->offset;
148 source->offset += *size;
149
150 return 1;
151 }
152
153 int push_data(RemoteSource *source, const char *data, size_t size) {
154 assert(source);
155 assert(source->state != STATE_EOF);
156
157 if (!realloc_buffer(source, source->filled + size)) {
158 log_error("Failed to store received data of size %zu "
159 "(in addition to existing %zu bytes with %zu filled): %s",
160 size, source->size, source->filled, strerror(ENOMEM));
161 return -ENOMEM;
162 }
163
164 memcpy(source->buf + source->filled, data, size);
165 source->filled += size;
166
167 return 0;
168 }
169
170 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
171
172 assert(source);
173 assert(source->state == STATE_DATA_START ||
174 source->state == STATE_DATA ||
175 source->state == STATE_DATA_FINISH);
176 assert(size <= DATA_SIZE_MAX);
177 assert(source->offset <= source->filled);
178 assert(source->filled <= source->size);
179 assert(source->buf != NULL || source->size == 0);
180 assert(source->buf == NULL || source->size > 0);
181 assert(source->fd >= 0);
182 assert(data);
183
184 while (source->filled - source->offset < size) {
185 int n;
186
187 if (source->passive_fd)
188 /* we have to wait for some data to come to us */
189 return -EAGAIN;
190
191 if (!realloc_buffer(source, source->offset + size))
192 return log_oom();
193
194 n = read(source->fd, source->buf + source->filled,
195 source->size - source->filled);
196 if (n < 0) {
197 if (errno != EAGAIN)
198 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
199 source->size - source->filled);
200 return -errno;
201 } else if (n == 0)
202 return 0;
203
204 source->filled += n;
205 }
206
207 *data = source->buf + source->offset;
208 source->offset += size;
209
210 return 1;
211 }
212
213 static int get_data_size(RemoteSource *source) {
214 int r;
215 void *data;
216
217 assert(source);
218 assert(source->state == STATE_DATA_START);
219 assert(source->data_size == 0);
220
221 r = fill_fixed_size(source, &data, sizeof(uint64_t));
222 if (r <= 0)
223 return r;
224
225 source->data_size = le64toh( *(uint64_t *) data );
226 if (source->data_size > DATA_SIZE_MAX) {
227 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
228 source->data_size, DATA_SIZE_MAX);
229 return -EINVAL;
230 }
231 if (source->data_size == 0)
232 log_warning("Binary field with zero length");
233
234 return 1;
235 }
236
237 static int get_data_data(RemoteSource *source, void **data) {
238 int r;
239
240 assert(source);
241 assert(data);
242 assert(source->state == STATE_DATA);
243
244 r = fill_fixed_size(source, data, source->data_size);
245 if (r <= 0)
246 return r;
247
248 return 1;
249 }
250
251 static int get_data_newline(RemoteSource *source) {
252 int r;
253 char *data;
254
255 assert(source);
256 assert(source->state == STATE_DATA_FINISH);
257
258 r = fill_fixed_size(source, (void**) &data, 1);
259 if (r <= 0)
260 return r;
261
262 assert(data);
263 if (*data != '\n') {
264 log_error("expected newline, got '%c'", *data);
265 return -EINVAL;
266 }
267
268 return 1;
269 }
270
271 static int process_dunder(RemoteSource *source, char *line, size_t n) {
272 const char *timestamp;
273 int r;
274
275 assert(line);
276 assert(n > 0);
277 assert(line[n-1] == '\n');
278
279 /* XXX: is it worth to support timestamps in extended format?
280 * We don't produce them, but who knows... */
281
282 timestamp = startswith(line, "__CURSOR=");
283 if (timestamp)
284 /* ignore __CURSOR */
285 return 1;
286
287 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
288 if (timestamp) {
289 long long unsigned x;
290 line[n-1] = '\0';
291 r = safe_atollu(timestamp, &x);
292 if (r < 0)
293 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
294 else
295 source->ts.realtime = x;
296 return r < 0 ? r : 1;
297 }
298
299 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
300 if (timestamp) {
301 long long unsigned x;
302 line[n-1] = '\0';
303 r = safe_atollu(timestamp, &x);
304 if (r < 0)
305 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
306 else
307 source->ts.monotonic = x;
308 return r < 0 ? r : 1;
309 }
310
311 timestamp = startswith(line, "__");
312 if (timestamp) {
313 log_notice("Unknown dunder line %s", line);
314 return 1;
315 }
316
317 /* no dunder */
318 return 0;
319 }
320
321 static int process_data(RemoteSource *source) {
322 int r;
323
324 switch(source->state) {
325 case STATE_LINE: {
326 char *line, *sep;
327 size_t n = 0;
328
329 assert(source->data_size == 0);
330
331 r = get_line(source, &line, &n);
332 if (r < 0)
333 return r;
334 if (r == 0) {
335 source->state = STATE_EOF;
336 return r;
337 }
338 assert(n > 0);
339 assert(line[n-1] == '\n');
340
341 if (n == 1) {
342 log_trace("Received empty line, event is ready");
343 return 1;
344 }
345
346 r = process_dunder(source, line, n);
347 if (r != 0)
348 return r < 0 ? r : 0;
349
350 /* MESSAGE=xxx\n
351 or
352 COREDUMP\n
353 LLLLLLLL0011223344...\n
354 */
355 sep = memchr(line, '=', n);
356 if (sep) {
357 /* chomp newline */
358 n--;
359
360 r = iovw_put(&source->iovw, line, n);
361 if (r < 0)
362 return r;
363 } else {
364 /* replace \n with = */
365 line[n-1] = '=';
366
367 source->field_len = n;
368 source->state = STATE_DATA_START;
369
370 /* we cannot put the field in iovec until we have all data */
371 }
372
373 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
374
375 return 0; /* continue */
376 }
377
378 case STATE_DATA_START:
379 assert(source->data_size == 0);
380
381 r = get_data_size(source);
382 // log_debug("get_data_size() -> %d", r);
383 if (r < 0)
384 return r;
385 if (r == 0) {
386 source->state = STATE_EOF;
387 return 0;
388 }
389
390 source->state = source->data_size > 0 ?
391 STATE_DATA : STATE_DATA_FINISH;
392
393 return 0; /* continue */
394
395 case STATE_DATA: {
396 void *data;
397 char *field;
398
399 assert(source->data_size > 0);
400
401 r = get_data_data(source, &data);
402 // log_debug("get_data_data() -> %d", r);
403 if (r < 0)
404 return r;
405 if (r == 0) {
406 source->state = STATE_EOF;
407 return 0;
408 }
409
410 assert(data);
411
412 field = (char*) data - sizeof(uint64_t) - source->field_len;
413 memmove(field + sizeof(uint64_t), field, source->field_len);
414
415 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
416 if (r < 0)
417 return r;
418
419 source->state = STATE_DATA_FINISH;
420
421 return 0; /* continue */
422 }
423
424 case STATE_DATA_FINISH:
425 r = get_data_newline(source);
426 // log_debug("get_data_newline() -> %d", r);
427 if (r < 0)
428 return r;
429 if (r == 0) {
430 source->state = STATE_EOF;
431 return 0;
432 }
433
434 source->data_size = 0;
435 source->state = STATE_LINE;
436
437 return 0; /* continue */
438 default:
439 assert_not_reached("wtf?");
440 }
441 }
442
443 int process_source(RemoteSource *source, bool compress, bool seal) {
444 size_t remain, target;
445 int r;
446
447 assert(source);
448 assert(source->writer);
449
450 r = process_data(source);
451 if (r <= 0)
452 return r;
453
454 /* We have a full event */
455 log_trace("Received full event from source@%p fd:%d (%s)",
456 source, source->fd, source->name);
457
458 if (!source->iovw.count) {
459 log_warning("Entry with no payload, skipping");
460 goto freeing;
461 }
462
463 assert(source->iovw.iovec);
464 assert(source->iovw.count);
465
466 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
467 if (r < 0)
468 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
469 iovw_size(&source->iovw));
470 else
471 r = 1;
472
473 freeing:
474 iovw_free_contents(&source->iovw);
475
476 /* possibly reset buffer position */
477 remain = source->filled - source->offset;
478
479 if (remain == 0) /* no brainer */
480 source->offset = source->scanned = source->filled = 0;
481 else if (source->offset > source->size - source->filled &&
482 source->offset > remain) {
483 memcpy(source->buf, source->buf + source->offset, remain);
484 source->offset = source->scanned = 0;
485 source->filled = remain;
486 }
487
488 target = source->size;
489 while (target > 16 * LINE_CHUNK && remain < target / 2)
490 target /= 2;
491 if (target < source->size) {
492 char *tmp;
493
494 tmp = realloc(source->buf, target);
495 if (!tmp)
496 log_warning("Failed to reallocate buffer to (smaller) size %zu",
497 target);
498 else {
499 log_debug("Reallocated buffer from %zu to %zu bytes",
500 source->size, target);
501 source->buf = tmp;
502 source->size = target;
503 }
504 }
505
506 return r;
507 }