]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/basic/journal-importer.c
log: minimize includes in log.h
[thirdparty/systemd.git] / src / basic / journal-importer.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2 /***
3 This file is part of systemd.
4
5 Copyright 2014 Zbigniew Jędrzejewski-Szmek
6
7 systemd is free software; you can redistribute it and/or modify it
8 under the terms of the GNU Lesser General Public License as published by
9 the Free Software Foundation; either version 2.1 of the License, or
10 (at your option) any later version.
11
12 systemd is distributed in the hope that it will be useful, but
13 WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 Lesser General Public License for more details.
16
17 You should have received a copy of the GNU Lesser General Public License
18 along with systemd; If not, see <http://www.gnu.org/licenses/>.
19 ***/
20
21 #include <errno.h>
22 #include <unistd.h>
23
24 #include "alloc-util.h"
25 #include "fd-util.h"
26 #include "io-util.h"
27 #include "journal-importer.h"
28 #include "parse-util.h"
29 #include "string-util.h"
30 #include "unaligned.h"
31
32 enum {
33 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
34 IMPORTER_STATE_DATA_START, /* reading binary data header */
35 IMPORTER_STATE_DATA, /* reading binary data */
36 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
37 IMPORTER_STATE_EOF, /* done */
38 };
39
40 static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
41 if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
42 return log_oom();
43
44 iovw->iovec[iovw->count++] = IOVEC_MAKE(data, len);
45 return 0;
46 }
47
48 static void iovw_free_contents(struct iovec_wrapper *iovw) {
49 iovw->iovec = mfree(iovw->iovec);
50 iovw->size_bytes = iovw->count = 0;
51 }
52
53 static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
54 size_t i;
55
56 for (i = 0; i < iovw->count; i++)
57 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
58 }
59
60 size_t iovw_size(struct iovec_wrapper *iovw) {
61 size_t n = 0, i;
62
63 for (i = 0; i < iovw->count; i++)
64 n += iovw->iovec[i].iov_len;
65
66 return n;
67 }
68
69 void journal_importer_cleanup(JournalImporter *imp) {
70 if (imp->fd >= 0 && !imp->passive_fd) {
71 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
72 safe_close(imp->fd);
73 }
74
75 free(imp->name);
76 free(imp->buf);
77 iovw_free_contents(&imp->iovw);
78 }
79
80 static char* realloc_buffer(JournalImporter *imp, size_t size) {
81 char *b, *old = imp->buf;
82
83 b = GREEDY_REALLOC(imp->buf, imp->size, size);
84 if (!b)
85 return NULL;
86
87 iovw_rebase(&imp->iovw, old, imp->buf);
88
89 return b;
90 }
91
92 static int get_line(JournalImporter *imp, char **line, size_t *size) {
93 ssize_t n;
94 char *c = NULL;
95
96 assert(imp);
97 assert(imp->state == IMPORTER_STATE_LINE);
98 assert(imp->offset <= imp->filled);
99 assert(imp->filled <= imp->size);
100 assert(!imp->buf || imp->size > 0);
101 assert(imp->fd >= 0);
102
103 for (;;) {
104 if (imp->buf) {
105 size_t start = MAX(imp->scanned, imp->offset);
106
107 c = memchr(imp->buf + start, '\n',
108 imp->filled - start);
109 if (c != NULL)
110 break;
111 }
112
113 imp->scanned = imp->filled;
114 if (imp->scanned >= DATA_SIZE_MAX) {
115 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
116 return -E2BIG;
117 }
118
119 if (imp->passive_fd)
120 /* we have to wait for some data to come to us */
121 return -EAGAIN;
122
123 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
124 we reallocate it, we'll increase the size at least a bit. */
125 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
126 if (imp->size - imp->filled < LINE_CHUNK &&
127 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
128 return log_oom();
129
130 assert(imp->buf);
131 assert(imp->size - imp->filled >= LINE_CHUNK ||
132 imp->size == ENTRY_SIZE_MAX);
133
134 n = read(imp->fd,
135 imp->buf + imp->filled,
136 imp->size - imp->filled);
137 if (n < 0) {
138 if (errno != EAGAIN)
139 log_error_errno(errno, "read(%d, ..., %zu): %m",
140 imp->fd,
141 imp->size - imp->filled);
142 return -errno;
143 } else if (n == 0)
144 return 0;
145
146 imp->filled += n;
147 }
148
149 *line = imp->buf + imp->offset;
150 *size = c + 1 - imp->buf - imp->offset;
151 imp->offset += *size;
152
153 return 1;
154 }
155
156 static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
157
158 assert(imp);
159 assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
160 assert(size <= DATA_SIZE_MAX);
161 assert(imp->offset <= imp->filled);
162 assert(imp->filled <= imp->size);
163 assert(imp->buf || imp->size == 0);
164 assert(!imp->buf || imp->size > 0);
165 assert(imp->fd >= 0);
166 assert(data);
167
168 while (imp->filled - imp->offset < size) {
169 int n;
170
171 if (imp->passive_fd)
172 /* we have to wait for some data to come to us */
173 return -EAGAIN;
174
175 if (!realloc_buffer(imp, imp->offset + size))
176 return log_oom();
177
178 n = read(imp->fd, imp->buf + imp->filled,
179 imp->size - imp->filled);
180 if (n < 0) {
181 if (errno != EAGAIN)
182 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
183 imp->size - imp->filled);
184 return -errno;
185 } else if (n == 0)
186 return 0;
187
188 imp->filled += n;
189 }
190
191 *data = imp->buf + imp->offset;
192 imp->offset += size;
193
194 return 1;
195 }
196
197 static int get_data_size(JournalImporter *imp) {
198 int r;
199 void *data;
200
201 assert(imp);
202 assert(imp->state == IMPORTER_STATE_DATA_START);
203 assert(imp->data_size == 0);
204
205 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
206 if (r <= 0)
207 return r;
208
209 imp->data_size = unaligned_read_le64(data);
210 if (imp->data_size > DATA_SIZE_MAX) {
211 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
212 imp->data_size, DATA_SIZE_MAX);
213 return -EINVAL;
214 }
215 if (imp->data_size == 0)
216 log_warning("Binary field with zero length");
217
218 return 1;
219 }
220
221 static int get_data_data(JournalImporter *imp, void **data) {
222 int r;
223
224 assert(imp);
225 assert(data);
226 assert(imp->state == IMPORTER_STATE_DATA);
227
228 r = fill_fixed_size(imp, data, imp->data_size);
229 if (r <= 0)
230 return r;
231
232 return 1;
233 }
234
235 static int get_data_newline(JournalImporter *imp) {
236 int r;
237 char *data;
238
239 assert(imp);
240 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
241
242 r = fill_fixed_size(imp, (void**) &data, 1);
243 if (r <= 0)
244 return r;
245
246 assert(data);
247 if (*data != '\n') {
248 log_error("expected newline, got '%c'", *data);
249 return -EINVAL;
250 }
251
252 return 1;
253 }
254
255 static int process_dunder(JournalImporter *imp, char *line, size_t n) {
256 const char *timestamp;
257 int r;
258
259 assert(line);
260 assert(n > 0);
261 assert(line[n-1] == '\n');
262
263 /* XXX: is it worth to support timestamps in extended format?
264 * We don't produce them, but who knows... */
265
266 timestamp = startswith(line, "__CURSOR=");
267 if (timestamp)
268 /* ignore __CURSOR */
269 return 1;
270
271 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
272 if (timestamp) {
273 long long unsigned x;
274 line[n-1] = '\0';
275 r = safe_atollu(timestamp, &x);
276 if (r < 0)
277 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
278 else
279 imp->ts.realtime = x;
280 return r < 0 ? r : 1;
281 }
282
283 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
284 if (timestamp) {
285 long long unsigned x;
286 line[n-1] = '\0';
287 r = safe_atollu(timestamp, &x);
288 if (r < 0)
289 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
290 else
291 imp->ts.monotonic = x;
292 return r < 0 ? r : 1;
293 }
294
295 timestamp = startswith(line, "__");
296 if (timestamp) {
297 log_notice("Unknown dunder line %s", line);
298 return 1;
299 }
300
301 /* no dunder */
302 return 0;
303 }
304
305 int journal_importer_process_data(JournalImporter *imp) {
306 int r;
307
308 switch(imp->state) {
309 case IMPORTER_STATE_LINE: {
310 char *line, *sep;
311 size_t n = 0;
312
313 assert(imp->data_size == 0);
314
315 r = get_line(imp, &line, &n);
316 if (r < 0)
317 return r;
318 if (r == 0) {
319 imp->state = IMPORTER_STATE_EOF;
320 return 0;
321 }
322 assert(n > 0);
323 assert(line[n-1] == '\n');
324
325 if (n == 1) {
326 log_trace("Received empty line, event is ready");
327 return 1;
328 }
329
330 r = process_dunder(imp, line, n);
331 if (r != 0)
332 return r < 0 ? r : 0;
333
334 /* MESSAGE=xxx\n
335 or
336 COREDUMP\n
337 LLLLLLLL0011223344...\n
338 */
339 sep = memchr(line, '=', n);
340 if (sep) {
341 /* chomp newline */
342 n--;
343
344 r = iovw_put(&imp->iovw, line, n);
345 if (r < 0)
346 return r;
347 } else {
348 /* replace \n with = */
349 line[n-1] = '=';
350
351 imp->field_len = n;
352 imp->state = IMPORTER_STATE_DATA_START;
353
354 /* we cannot put the field in iovec until we have all data */
355 }
356
357 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
358
359 return 0; /* continue */
360 }
361
362 case IMPORTER_STATE_DATA_START:
363 assert(imp->data_size == 0);
364
365 r = get_data_size(imp);
366 // log_debug("get_data_size() -> %d", r);
367 if (r < 0)
368 return r;
369 if (r == 0) {
370 imp->state = IMPORTER_STATE_EOF;
371 return 0;
372 }
373
374 imp->state = imp->data_size > 0 ?
375 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
376
377 return 0; /* continue */
378
379 case IMPORTER_STATE_DATA: {
380 void *data;
381 char *field;
382
383 assert(imp->data_size > 0);
384
385 r = get_data_data(imp, &data);
386 // log_debug("get_data_data() -> %d", r);
387 if (r < 0)
388 return r;
389 if (r == 0) {
390 imp->state = IMPORTER_STATE_EOF;
391 return 0;
392 }
393
394 assert(data);
395
396 field = (char*) data - sizeof(uint64_t) - imp->field_len;
397 memmove(field + sizeof(uint64_t), field, imp->field_len);
398
399 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
400 if (r < 0)
401 return r;
402
403 imp->state = IMPORTER_STATE_DATA_FINISH;
404
405 return 0; /* continue */
406 }
407
408 case IMPORTER_STATE_DATA_FINISH:
409 r = get_data_newline(imp);
410 // log_debug("get_data_newline() -> %d", r);
411 if (r < 0)
412 return r;
413 if (r == 0) {
414 imp->state = IMPORTER_STATE_EOF;
415 return 0;
416 }
417
418 imp->data_size = 0;
419 imp->state = IMPORTER_STATE_LINE;
420
421 return 0; /* continue */
422 default:
423 assert_not_reached("wtf?");
424 }
425 }
426
427 int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
428 assert(imp);
429 assert(imp->state != IMPORTER_STATE_EOF);
430
431 if (!realloc_buffer(imp, imp->filled + size)) {
432 log_error("Failed to store received data of size %zu "
433 "(in addition to existing %zu bytes with %zu filled): %s",
434 size, imp->size, imp->filled, strerror(ENOMEM));
435 return -ENOMEM;
436 }
437
438 memcpy(imp->buf + imp->filled, data, size);
439 imp->filled += size;
440
441 return 0;
442 }
443
444 void journal_importer_drop_iovw(JournalImporter *imp) {
445 size_t remain, target;
446
447 /* This function drops processed data that along with the iovw that points at it */
448
449 iovw_free_contents(&imp->iovw);
450
451 /* possibly reset buffer position */
452 remain = imp->filled - imp->offset;
453
454 if (remain == 0) /* no brainer */
455 imp->offset = imp->scanned = imp->filled = 0;
456 else if (imp->offset > imp->size - imp->filled &&
457 imp->offset > remain) {
458 memcpy(imp->buf, imp->buf + imp->offset, remain);
459 imp->offset = imp->scanned = 0;
460 imp->filled = remain;
461 }
462
463 target = imp->size;
464 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
465 target /= 2;
466 if (target < imp->size) {
467 char *tmp;
468
469 tmp = realloc(imp->buf, target);
470 if (!tmp)
471 log_warning("Failed to reallocate buffer to (smaller) size %zu",
472 target);
473 else {
474 log_debug("Reallocated buffer from %zu to %zu bytes",
475 imp->size, target);
476 imp->buf = tmp;
477 imp->size = target;
478 }
479 }
480 }
481
482 bool journal_importer_eof(const JournalImporter *imp) {
483 return imp->state == IMPORTER_STATE_EOF;
484 }