]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/basic/journal-importer.c
Merge pull request #8575 from keszybz/non-absolute-paths
[thirdparty/systemd.git] / src / basic / journal-importer.c
CommitLineData
53e1b683 1/* SPDX-License-Identifier: LGPL-2.1+ */
b18453ed
ZJS
2/***
3 This file is part of systemd.
4
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
b18453ed
ZJS
6***/
7
dccca82b 8#include <errno.h>
b18453ed
ZJS
9#include <unistd.h>
10
11#include "alloc-util.h"
b18453ed 12#include "fd-util.h"
e6a7ec4b
LP
13#include "io-util.h"
14#include "journal-importer.h"
b18453ed
ZJS
15#include "parse-util.h"
16#include "string-util.h"
f652c62d 17#include "unaligned.h"
b18453ed
ZJS
18
19enum {
20 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
21 IMPORTER_STATE_DATA_START, /* reading binary data header */
22 IMPORTER_STATE_DATA, /* reading binary data */
23 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
24 IMPORTER_STATE_EOF, /* done */
25};
26
27static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
28 if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
29 return log_oom();
30
e6a7ec4b 31 iovw->iovec[iovw->count++] = IOVEC_MAKE(data, len);
b18453ed
ZJS
32 return 0;
33}
34
35static void iovw_free_contents(struct iovec_wrapper *iovw) {
36 iovw->iovec = mfree(iovw->iovec);
37 iovw->size_bytes = iovw->count = 0;
38}
39
40static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
41 size_t i;
42
43 for (i = 0; i < iovw->count; i++)
44 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
45}
46
47size_t iovw_size(struct iovec_wrapper *iovw) {
48 size_t n = 0, i;
49
50 for (i = 0; i < iovw->count; i++)
51 n += iovw->iovec[i].iov_len;
52
53 return n;
54}
55
56void journal_importer_cleanup(JournalImporter *imp) {
57 if (imp->fd >= 0 && !imp->passive_fd) {
58 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
59 safe_close(imp->fd);
60 }
61
2ddb70d2 62 free(imp->name);
b18453ed
ZJS
63 free(imp->buf);
64 iovw_free_contents(&imp->iovw);
65}
66
67static char* realloc_buffer(JournalImporter *imp, size_t size) {
68 char *b, *old = imp->buf;
69
70 b = GREEDY_REALLOC(imp->buf, imp->size, size);
71 if (!b)
72 return NULL;
73
74 iovw_rebase(&imp->iovw, old, imp->buf);
75
76 return b;
77}
78
79static int get_line(JournalImporter *imp, char **line, size_t *size) {
80 ssize_t n;
81 char *c = NULL;
82
83 assert(imp);
84 assert(imp->state == IMPORTER_STATE_LINE);
85 assert(imp->offset <= imp->filled);
86 assert(imp->filled <= imp->size);
234519ae 87 assert(!imp->buf || imp->size > 0);
b18453ed
ZJS
88 assert(imp->fd >= 0);
89
90 for (;;) {
91 if (imp->buf) {
92 size_t start = MAX(imp->scanned, imp->offset);
93
94 c = memchr(imp->buf + start, '\n',
95 imp->filled - start);
96 if (c != NULL)
97 break;
98 }
99
100 imp->scanned = imp->filled;
101 if (imp->scanned >= DATA_SIZE_MAX) {
102 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
103 return -E2BIG;
104 }
105
106 if (imp->passive_fd)
107 /* we have to wait for some data to come to us */
108 return -EAGAIN;
109
110 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
111 we reallocate it, we'll increase the size at least a bit. */
112 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
113 if (imp->size - imp->filled < LINE_CHUNK &&
114 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
115 return log_oom();
116
117 assert(imp->buf);
118 assert(imp->size - imp->filled >= LINE_CHUNK ||
119 imp->size == ENTRY_SIZE_MAX);
120
121 n = read(imp->fd,
122 imp->buf + imp->filled,
123 imp->size - imp->filled);
124 if (n < 0) {
125 if (errno != EAGAIN)
126 log_error_errno(errno, "read(%d, ..., %zu): %m",
127 imp->fd,
128 imp->size - imp->filled);
129 return -errno;
130 } else if (n == 0)
131 return 0;
132
133 imp->filled += n;
134 }
135
136 *line = imp->buf + imp->offset;
137 *size = c + 1 - imp->buf - imp->offset;
138 imp->offset += *size;
139
140 return 1;
141}
142
143static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
144
145 assert(imp);
3742095b 146 assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
b18453ed
ZJS
147 assert(size <= DATA_SIZE_MAX);
148 assert(imp->offset <= imp->filled);
149 assert(imp->filled <= imp->size);
234519ae
LP
150 assert(imp->buf || imp->size == 0);
151 assert(!imp->buf || imp->size > 0);
b18453ed
ZJS
152 assert(imp->fd >= 0);
153 assert(data);
154
155 while (imp->filled - imp->offset < size) {
156 int n;
157
158 if (imp->passive_fd)
159 /* we have to wait for some data to come to us */
160 return -EAGAIN;
161
162 if (!realloc_buffer(imp, imp->offset + size))
163 return log_oom();
164
165 n = read(imp->fd, imp->buf + imp->filled,
166 imp->size - imp->filled);
167 if (n < 0) {
168 if (errno != EAGAIN)
169 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
170 imp->size - imp->filled);
171 return -errno;
172 } else if (n == 0)
173 return 0;
174
175 imp->filled += n;
176 }
177
178 *data = imp->buf + imp->offset;
179 imp->offset += size;
180
181 return 1;
182}
183
184static int get_data_size(JournalImporter *imp) {
185 int r;
186 void *data;
187
188 assert(imp);
189 assert(imp->state == IMPORTER_STATE_DATA_START);
190 assert(imp->data_size == 0);
191
192 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
193 if (r <= 0)
194 return r;
195
f652c62d 196 imp->data_size = unaligned_read_le64(data);
b18453ed
ZJS
197 if (imp->data_size > DATA_SIZE_MAX) {
198 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
199 imp->data_size, DATA_SIZE_MAX);
200 return -EINVAL;
201 }
202 if (imp->data_size == 0)
203 log_warning("Binary field with zero length");
204
205 return 1;
206}
207
208static int get_data_data(JournalImporter *imp, void **data) {
209 int r;
210
211 assert(imp);
212 assert(data);
213 assert(imp->state == IMPORTER_STATE_DATA);
214
215 r = fill_fixed_size(imp, data, imp->data_size);
216 if (r <= 0)
217 return r;
218
219 return 1;
220}
221
222static int get_data_newline(JournalImporter *imp) {
223 int r;
224 char *data;
225
226 assert(imp);
227 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
228
229 r = fill_fixed_size(imp, (void**) &data, 1);
230 if (r <= 0)
231 return r;
232
233 assert(data);
234 if (*data != '\n') {
235 log_error("expected newline, got '%c'", *data);
236 return -EINVAL;
237 }
238
239 return 1;
240}
241
242static int process_dunder(JournalImporter *imp, char *line, size_t n) {
243 const char *timestamp;
244 int r;
245
246 assert(line);
247 assert(n > 0);
248 assert(line[n-1] == '\n');
249
250 /* XXX: is it worth to support timestamps in extended format?
251 * We don't produce them, but who knows... */
252
253 timestamp = startswith(line, "__CURSOR=");
254 if (timestamp)
255 /* ignore __CURSOR */
256 return 1;
257
258 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
259 if (timestamp) {
260 long long unsigned x;
261 line[n-1] = '\0';
262 r = safe_atollu(timestamp, &x);
263 if (r < 0)
264 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
265 else
266 imp->ts.realtime = x;
267 return r < 0 ? r : 1;
268 }
269
270 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
271 if (timestamp) {
272 long long unsigned x;
273 line[n-1] = '\0';
274 r = safe_atollu(timestamp, &x);
275 if (r < 0)
276 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
277 else
278 imp->ts.monotonic = x;
279 return r < 0 ? r : 1;
280 }
281
282 timestamp = startswith(line, "__");
283 if (timestamp) {
284 log_notice("Unknown dunder line %s", line);
285 return 1;
286 }
287
288 /* no dunder */
289 return 0;
290}
291
292int journal_importer_process_data(JournalImporter *imp) {
293 int r;
294
295 switch(imp->state) {
296 case IMPORTER_STATE_LINE: {
297 char *line, *sep;
298 size_t n = 0;
299
300 assert(imp->data_size == 0);
301
302 r = get_line(imp, &line, &n);
303 if (r < 0)
304 return r;
305 if (r == 0) {
306 imp->state = IMPORTER_STATE_EOF;
d74dc4f2 307 return 0;
b18453ed
ZJS
308 }
309 assert(n > 0);
310 assert(line[n-1] == '\n');
311
312 if (n == 1) {
313 log_trace("Received empty line, event is ready");
314 return 1;
315 }
316
317 r = process_dunder(imp, line, n);
318 if (r != 0)
319 return r < 0 ? r : 0;
320
321 /* MESSAGE=xxx\n
322 or
323 COREDUMP\n
324 LLLLLLLL0011223344...\n
325 */
326 sep = memchr(line, '=', n);
327 if (sep) {
328 /* chomp newline */
329 n--;
330
331 r = iovw_put(&imp->iovw, line, n);
332 if (r < 0)
333 return r;
334 } else {
335 /* replace \n with = */
336 line[n-1] = '=';
337
338 imp->field_len = n;
339 imp->state = IMPORTER_STATE_DATA_START;
340
341 /* we cannot put the field in iovec until we have all data */
342 }
343
344 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
345
346 return 0; /* continue */
347 }
348
349 case IMPORTER_STATE_DATA_START:
350 assert(imp->data_size == 0);
351
352 r = get_data_size(imp);
353 // log_debug("get_data_size() -> %d", r);
354 if (r < 0)
355 return r;
356 if (r == 0) {
357 imp->state = IMPORTER_STATE_EOF;
358 return 0;
359 }
360
361 imp->state = imp->data_size > 0 ?
362 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
363
364 return 0; /* continue */
365
366 case IMPORTER_STATE_DATA: {
367 void *data;
368 char *field;
369
370 assert(imp->data_size > 0);
371
372 r = get_data_data(imp, &data);
373 // log_debug("get_data_data() -> %d", r);
374 if (r < 0)
375 return r;
376 if (r == 0) {
377 imp->state = IMPORTER_STATE_EOF;
378 return 0;
379 }
380
381 assert(data);
382
383 field = (char*) data - sizeof(uint64_t) - imp->field_len;
384 memmove(field + sizeof(uint64_t), field, imp->field_len);
385
386 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
387 if (r < 0)
388 return r;
389
390 imp->state = IMPORTER_STATE_DATA_FINISH;
391
392 return 0; /* continue */
393 }
394
395 case IMPORTER_STATE_DATA_FINISH:
396 r = get_data_newline(imp);
397 // log_debug("get_data_newline() -> %d", r);
398 if (r < 0)
399 return r;
400 if (r == 0) {
401 imp->state = IMPORTER_STATE_EOF;
402 return 0;
403 }
404
405 imp->data_size = 0;
406 imp->state = IMPORTER_STATE_LINE;
407
408 return 0; /* continue */
409 default:
410 assert_not_reached("wtf?");
411 }
412}
413
414int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
415 assert(imp);
416 assert(imp->state != IMPORTER_STATE_EOF);
417
418 if (!realloc_buffer(imp, imp->filled + size)) {
419 log_error("Failed to store received data of size %zu "
420 "(in addition to existing %zu bytes with %zu filled): %s",
421 size, imp->size, imp->filled, strerror(ENOMEM));
422 return -ENOMEM;
423 }
424
425 memcpy(imp->buf + imp->filled, data, size);
426 imp->filled += size;
427
428 return 0;
429}
430
431void journal_importer_drop_iovw(JournalImporter *imp) {
432 size_t remain, target;
433
434 /* This function drops processed data that along with the iovw that points at it */
435
436 iovw_free_contents(&imp->iovw);
437
438 /* possibly reset buffer position */
439 remain = imp->filled - imp->offset;
440
441 if (remain == 0) /* no brainer */
442 imp->offset = imp->scanned = imp->filled = 0;
443 else if (imp->offset > imp->size - imp->filled &&
444 imp->offset > remain) {
445 memcpy(imp->buf, imp->buf + imp->offset, remain);
446 imp->offset = imp->scanned = 0;
447 imp->filled = remain;
448 }
449
450 target = imp->size;
451 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
452 target /= 2;
453 if (target < imp->size) {
454 char *tmp;
455
456 tmp = realloc(imp->buf, target);
457 if (!tmp)
458 log_warning("Failed to reallocate buffer to (smaller) size %zu",
459 target);
460 else {
461 log_debug("Reallocated buffer from %zu to %zu bytes",
462 imp->size, target);
463 imp->buf = tmp;
464 imp->size = target;
465 }
466 }
467}
468
469bool journal_importer_eof(const JournalImporter *imp) {
470 return imp->state == IMPORTER_STATE_EOF;
471}