]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/basic/journal-importer.c
Add SPDX license identifiers to source files under the LGPL
[thirdparty/systemd.git] / src / basic / journal-importer.c
CommitLineData
53e1b683 1/* SPDX-License-Identifier: LGPL-2.1+ */
b18453ed
ZJS
2/***
3 This file is part of systemd.
4
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
6
7 systemd is free software; you can redistribute it and/or modify it
8 under the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 2.1 of the License, or
10 (at your option) any later version.
11
12 systemd is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with systemd; If not, see <http://www.gnu.org/licenses/>.
19***/
20
21#include <unistd.h>
22
23#include "alloc-util.h"
b18453ed 24#include "fd-util.h"
e6a7ec4b
LP
25#include "io-util.h"
26#include "journal-importer.h"
b18453ed
ZJS
27#include "parse-util.h"
28#include "string-util.h"
f652c62d 29#include "unaligned.h"
b18453ed
ZJS
30
31enum {
32 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
33 IMPORTER_STATE_DATA_START, /* reading binary data header */
34 IMPORTER_STATE_DATA, /* reading binary data */
35 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
36 IMPORTER_STATE_EOF, /* done */
37};
38
39static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
40 if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
41 return log_oom();
42
e6a7ec4b 43 iovw->iovec[iovw->count++] = IOVEC_MAKE(data, len);
b18453ed
ZJS
44 return 0;
45}
46
47static void iovw_free_contents(struct iovec_wrapper *iovw) {
48 iovw->iovec = mfree(iovw->iovec);
49 iovw->size_bytes = iovw->count = 0;
50}
51
52static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
53 size_t i;
54
55 for (i = 0; i < iovw->count; i++)
56 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
57}
58
59size_t iovw_size(struct iovec_wrapper *iovw) {
60 size_t n = 0, i;
61
62 for (i = 0; i < iovw->count; i++)
63 n += iovw->iovec[i].iov_len;
64
65 return n;
66}
67
68void journal_importer_cleanup(JournalImporter *imp) {
69 if (imp->fd >= 0 && !imp->passive_fd) {
70 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
71 safe_close(imp->fd);
72 }
73
2ddb70d2 74 free(imp->name);
b18453ed
ZJS
75 free(imp->buf);
76 iovw_free_contents(&imp->iovw);
77}
78
79static char* realloc_buffer(JournalImporter *imp, size_t size) {
80 char *b, *old = imp->buf;
81
82 b = GREEDY_REALLOC(imp->buf, imp->size, size);
83 if (!b)
84 return NULL;
85
86 iovw_rebase(&imp->iovw, old, imp->buf);
87
88 return b;
89}
90
91static int get_line(JournalImporter *imp, char **line, size_t *size) {
92 ssize_t n;
93 char *c = NULL;
94
95 assert(imp);
96 assert(imp->state == IMPORTER_STATE_LINE);
97 assert(imp->offset <= imp->filled);
98 assert(imp->filled <= imp->size);
99 assert(imp->buf == NULL || imp->size > 0);
100 assert(imp->fd >= 0);
101
102 for (;;) {
103 if (imp->buf) {
104 size_t start = MAX(imp->scanned, imp->offset);
105
106 c = memchr(imp->buf + start, '\n',
107 imp->filled - start);
108 if (c != NULL)
109 break;
110 }
111
112 imp->scanned = imp->filled;
113 if (imp->scanned >= DATA_SIZE_MAX) {
114 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
115 return -E2BIG;
116 }
117
118 if (imp->passive_fd)
119 /* we have to wait for some data to come to us */
120 return -EAGAIN;
121
122 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
123 we reallocate it, we'll increase the size at least a bit. */
124 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
125 if (imp->size - imp->filled < LINE_CHUNK &&
126 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
127 return log_oom();
128
129 assert(imp->buf);
130 assert(imp->size - imp->filled >= LINE_CHUNK ||
131 imp->size == ENTRY_SIZE_MAX);
132
133 n = read(imp->fd,
134 imp->buf + imp->filled,
135 imp->size - imp->filled);
136 if (n < 0) {
137 if (errno != EAGAIN)
138 log_error_errno(errno, "read(%d, ..., %zu): %m",
139 imp->fd,
140 imp->size - imp->filled);
141 return -errno;
142 } else if (n == 0)
143 return 0;
144
145 imp->filled += n;
146 }
147
148 *line = imp->buf + imp->offset;
149 *size = c + 1 - imp->buf - imp->offset;
150 imp->offset += *size;
151
152 return 1;
153}
154
155static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
156
157 assert(imp);
3742095b 158 assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
b18453ed
ZJS
159 assert(size <= DATA_SIZE_MAX);
160 assert(imp->offset <= imp->filled);
161 assert(imp->filled <= imp->size);
162 assert(imp->buf != NULL || imp->size == 0);
163 assert(imp->buf == NULL || imp->size > 0);
164 assert(imp->fd >= 0);
165 assert(data);
166
167 while (imp->filled - imp->offset < size) {
168 int n;
169
170 if (imp->passive_fd)
171 /* we have to wait for some data to come to us */
172 return -EAGAIN;
173
174 if (!realloc_buffer(imp, imp->offset + size))
175 return log_oom();
176
177 n = read(imp->fd, imp->buf + imp->filled,
178 imp->size - imp->filled);
179 if (n < 0) {
180 if (errno != EAGAIN)
181 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
182 imp->size - imp->filled);
183 return -errno;
184 } else if (n == 0)
185 return 0;
186
187 imp->filled += n;
188 }
189
190 *data = imp->buf + imp->offset;
191 imp->offset += size;
192
193 return 1;
194}
195
196static int get_data_size(JournalImporter *imp) {
197 int r;
198 void *data;
199
200 assert(imp);
201 assert(imp->state == IMPORTER_STATE_DATA_START);
202 assert(imp->data_size == 0);
203
204 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
205 if (r <= 0)
206 return r;
207
f652c62d 208 imp->data_size = unaligned_read_le64(data);
b18453ed
ZJS
209 if (imp->data_size > DATA_SIZE_MAX) {
210 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
211 imp->data_size, DATA_SIZE_MAX);
212 return -EINVAL;
213 }
214 if (imp->data_size == 0)
215 log_warning("Binary field with zero length");
216
217 return 1;
218}
219
220static int get_data_data(JournalImporter *imp, void **data) {
221 int r;
222
223 assert(imp);
224 assert(data);
225 assert(imp->state == IMPORTER_STATE_DATA);
226
227 r = fill_fixed_size(imp, data, imp->data_size);
228 if (r <= 0)
229 return r;
230
231 return 1;
232}
233
234static int get_data_newline(JournalImporter *imp) {
235 int r;
236 char *data;
237
238 assert(imp);
239 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
240
241 r = fill_fixed_size(imp, (void**) &data, 1);
242 if (r <= 0)
243 return r;
244
245 assert(data);
246 if (*data != '\n') {
247 log_error("expected newline, got '%c'", *data);
248 return -EINVAL;
249 }
250
251 return 1;
252}
253
254static int process_dunder(JournalImporter *imp, char *line, size_t n) {
255 const char *timestamp;
256 int r;
257
258 assert(line);
259 assert(n > 0);
260 assert(line[n-1] == '\n');
261
262 /* XXX: is it worth to support timestamps in extended format?
263 * We don't produce them, but who knows... */
264
265 timestamp = startswith(line, "__CURSOR=");
266 if (timestamp)
267 /* ignore __CURSOR */
268 return 1;
269
270 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
271 if (timestamp) {
272 long long unsigned x;
273 line[n-1] = '\0';
274 r = safe_atollu(timestamp, &x);
275 if (r < 0)
276 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
277 else
278 imp->ts.realtime = x;
279 return r < 0 ? r : 1;
280 }
281
282 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
283 if (timestamp) {
284 long long unsigned x;
285 line[n-1] = '\0';
286 r = safe_atollu(timestamp, &x);
287 if (r < 0)
288 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
289 else
290 imp->ts.monotonic = x;
291 return r < 0 ? r : 1;
292 }
293
294 timestamp = startswith(line, "__");
295 if (timestamp) {
296 log_notice("Unknown dunder line %s", line);
297 return 1;
298 }
299
300 /* no dunder */
301 return 0;
302}
303
304int journal_importer_process_data(JournalImporter *imp) {
305 int r;
306
307 switch(imp->state) {
308 case IMPORTER_STATE_LINE: {
309 char *line, *sep;
310 size_t n = 0;
311
312 assert(imp->data_size == 0);
313
314 r = get_line(imp, &line, &n);
315 if (r < 0)
316 return r;
317 if (r == 0) {
318 imp->state = IMPORTER_STATE_EOF;
d74dc4f2 319 return 0;
b18453ed
ZJS
320 }
321 assert(n > 0);
322 assert(line[n-1] == '\n');
323
324 if (n == 1) {
325 log_trace("Received empty line, event is ready");
326 return 1;
327 }
328
329 r = process_dunder(imp, line, n);
330 if (r != 0)
331 return r < 0 ? r : 0;
332
333 /* MESSAGE=xxx\n
334 or
335 COREDUMP\n
336 LLLLLLLL0011223344...\n
337 */
338 sep = memchr(line, '=', n);
339 if (sep) {
340 /* chomp newline */
341 n--;
342
343 r = iovw_put(&imp->iovw, line, n);
344 if (r < 0)
345 return r;
346 } else {
347 /* replace \n with = */
348 line[n-1] = '=';
349
350 imp->field_len = n;
351 imp->state = IMPORTER_STATE_DATA_START;
352
353 /* we cannot put the field in iovec until we have all data */
354 }
355
356 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
357
358 return 0; /* continue */
359 }
360
361 case IMPORTER_STATE_DATA_START:
362 assert(imp->data_size == 0);
363
364 r = get_data_size(imp);
365 // log_debug("get_data_size() -> %d", r);
366 if (r < 0)
367 return r;
368 if (r == 0) {
369 imp->state = IMPORTER_STATE_EOF;
370 return 0;
371 }
372
373 imp->state = imp->data_size > 0 ?
374 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
375
376 return 0; /* continue */
377
378 case IMPORTER_STATE_DATA: {
379 void *data;
380 char *field;
381
382 assert(imp->data_size > 0);
383
384 r = get_data_data(imp, &data);
385 // log_debug("get_data_data() -> %d", r);
386 if (r < 0)
387 return r;
388 if (r == 0) {
389 imp->state = IMPORTER_STATE_EOF;
390 return 0;
391 }
392
393 assert(data);
394
395 field = (char*) data - sizeof(uint64_t) - imp->field_len;
396 memmove(field + sizeof(uint64_t), field, imp->field_len);
397
398 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
399 if (r < 0)
400 return r;
401
402 imp->state = IMPORTER_STATE_DATA_FINISH;
403
404 return 0; /* continue */
405 }
406
407 case IMPORTER_STATE_DATA_FINISH:
408 r = get_data_newline(imp);
409 // log_debug("get_data_newline() -> %d", r);
410 if (r < 0)
411 return r;
412 if (r == 0) {
413 imp->state = IMPORTER_STATE_EOF;
414 return 0;
415 }
416
417 imp->data_size = 0;
418 imp->state = IMPORTER_STATE_LINE;
419
420 return 0; /* continue */
421 default:
422 assert_not_reached("wtf?");
423 }
424}
425
426int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
427 assert(imp);
428 assert(imp->state != IMPORTER_STATE_EOF);
429
430 if (!realloc_buffer(imp, imp->filled + size)) {
431 log_error("Failed to store received data of size %zu "
432 "(in addition to existing %zu bytes with %zu filled): %s",
433 size, imp->size, imp->filled, strerror(ENOMEM));
434 return -ENOMEM;
435 }
436
437 memcpy(imp->buf + imp->filled, data, size);
438 imp->filled += size;
439
440 return 0;
441}
442
443void journal_importer_drop_iovw(JournalImporter *imp) {
444 size_t remain, target;
445
446 /* This function drops processed data that along with the iovw that points at it */
447
448 iovw_free_contents(&imp->iovw);
449
450 /* possibly reset buffer position */
451 remain = imp->filled - imp->offset;
452
453 if (remain == 0) /* no brainer */
454 imp->offset = imp->scanned = imp->filled = 0;
455 else if (imp->offset > imp->size - imp->filled &&
456 imp->offset > remain) {
457 memcpy(imp->buf, imp->buf + imp->offset, remain);
458 imp->offset = imp->scanned = 0;
459 imp->filled = remain;
460 }
461
462 target = imp->size;
463 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
464 target /= 2;
465 if (target < imp->size) {
466 char *tmp;
467
468 tmp = realloc(imp->buf, target);
469 if (!tmp)
470 log_warning("Failed to reallocate buffer to (smaller) size %zu",
471 target);
472 else {
473 log_debug("Reallocated buffer from %zu to %zu bytes",
474 imp->size, target);
475 imp->buf = tmp;
476 imp->size = target;
477 }
478 }
479}
480
481bool journal_importer_eof(const JournalImporter *imp) {
482 return imp->state == IMPORTER_STATE_EOF;
483}