]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/shared/journal-importer.c
catalog: update Polish translation
[thirdparty/systemd.git] / src / shared / journal-importer.c
1 /* SPDX-License-Identifier: LGPL-2.1+ */
2
3 #include <errno.h>
4 #include <unistd.h>
5
6 #include "alloc-util.h"
7 #include "errno-util.h"
8 #include "escape.h"
9 #include "fd-util.h"
10 #include "io-util.h"
11 #include "journal-file.h"
12 #include "journal-importer.h"
13 #include "journal-util.h"
14 #include "parse-util.h"
15 #include "string-util.h"
16 #include "unaligned.h"
17
18 enum {
19 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
20 IMPORTER_STATE_DATA_START, /* reading binary data header */
21 IMPORTER_STATE_DATA, /* reading binary data */
22 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
23 IMPORTER_STATE_EOF, /* done */
24 };
25
26 void journal_importer_cleanup(JournalImporter *imp) {
27 if (imp->fd >= 0 && !imp->passive_fd) {
28 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
29 safe_close(imp->fd);
30 }
31
32 free(imp->name);
33 free(imp->buf);
34 iovw_free_contents(&imp->iovw, false);
35 }
36
37 static char* realloc_buffer(JournalImporter *imp, size_t size) {
38 char *b, *old = imp->buf;
39
40 b = GREEDY_REALLOC(imp->buf, imp->size, size);
41 if (!b)
42 return NULL;
43
44 iovw_rebase(&imp->iovw, old, imp->buf);
45
46 return b;
47 }
48
49 static int get_line(JournalImporter *imp, char **line, size_t *size) {
50 ssize_t n;
51 char *c = NULL;
52
53 assert(imp);
54 assert(imp->state == IMPORTER_STATE_LINE);
55 assert(imp->offset <= imp->filled);
56 assert(imp->filled <= imp->size);
57 assert(!imp->buf || imp->size > 0);
58 assert(imp->fd >= 0);
59
60 for (;;) {
61 if (imp->buf) {
62 size_t start = MAX(imp->scanned, imp->offset);
63
64 c = memchr(imp->buf + start, '\n',
65 imp->filled - start);
66 if (c)
67 break;
68 }
69
70 imp->scanned = imp->filled;
71 if (imp->scanned >= DATA_SIZE_MAX)
72 return log_error_errno(SYNTHETIC_ERRNO(ENOBUFS),
73 "Entry is bigger than %u bytes.",
74 DATA_SIZE_MAX);
75
76 if (imp->passive_fd)
77 /* we have to wait for some data to come to us */
78 return -EAGAIN;
79
80 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
81 we reallocate it, we'll increase the size at least a bit. */
82 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
83 if (imp->size - imp->filled < LINE_CHUNK &&
84 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
85 return log_oom();
86
87 assert(imp->buf);
88 assert(imp->size - imp->filled >= LINE_CHUNK ||
89 imp->size == ENTRY_SIZE_MAX);
90
91 n = read(imp->fd,
92 imp->buf + imp->filled,
93 imp->size - imp->filled);
94 if (n < 0) {
95 if (errno != EAGAIN)
96 log_error_errno(errno, "read(%d, ..., %zu): %m",
97 imp->fd,
98 imp->size - imp->filled);
99 return -errno;
100 } else if (n == 0)
101 return 0;
102
103 imp->filled += n;
104 }
105
106 *line = imp->buf + imp->offset;
107 *size = c + 1 - imp->buf - imp->offset;
108 imp->offset += *size;
109
110 return 1;
111 }
112
113 static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
114
115 assert(imp);
116 assert(IN_SET(imp->state, IMPORTER_STATE_DATA_START, IMPORTER_STATE_DATA, IMPORTER_STATE_DATA_FINISH));
117 assert(size <= DATA_SIZE_MAX);
118 assert(imp->offset <= imp->filled);
119 assert(imp->filled <= imp->size);
120 assert(imp->buf || imp->size == 0);
121 assert(!imp->buf || imp->size > 0);
122 assert(imp->fd >= 0);
123 assert(data);
124
125 while (imp->filled - imp->offset < size) {
126 int n;
127
128 if (imp->passive_fd)
129 /* we have to wait for some data to come to us */
130 return -EAGAIN;
131
132 if (!realloc_buffer(imp, imp->offset + size))
133 return log_oom();
134
135 n = read(imp->fd, imp->buf + imp->filled,
136 imp->size - imp->filled);
137 if (n < 0) {
138 if (errno != EAGAIN)
139 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
140 imp->size - imp->filled);
141 return -errno;
142 } else if (n == 0)
143 return 0;
144
145 imp->filled += n;
146 }
147
148 *data = imp->buf + imp->offset;
149 imp->offset += size;
150
151 return 1;
152 }
153
154 static int get_data_size(JournalImporter *imp) {
155 int r;
156 void *data;
157
158 assert(imp);
159 assert(imp->state == IMPORTER_STATE_DATA_START);
160 assert(imp->data_size == 0);
161
162 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
163 if (r <= 0)
164 return r;
165
166 imp->data_size = unaligned_read_le64(data);
167 if (imp->data_size > DATA_SIZE_MAX)
168 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
169 "Stream declares field with size %zu > DATA_SIZE_MAX = %u",
170 imp->data_size, DATA_SIZE_MAX);
171 if (imp->data_size == 0)
172 log_warning("Binary field with zero length");
173
174 return 1;
175 }
176
177 static int get_data_data(JournalImporter *imp, void **data) {
178 int r;
179
180 assert(imp);
181 assert(data);
182 assert(imp->state == IMPORTER_STATE_DATA);
183
184 r = fill_fixed_size(imp, data, imp->data_size);
185 if (r <= 0)
186 return r;
187
188 return 1;
189 }
190
191 static int get_data_newline(JournalImporter *imp) {
192 int r;
193 char *data;
194
195 assert(imp);
196 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
197
198 r = fill_fixed_size(imp, (void**) &data, 1);
199 if (r <= 0)
200 return r;
201
202 assert(data);
203 if (*data != '\n') {
204 char buf[4];
205 int l;
206
207 l = cescape_char(*data, buf);
208 return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
209 "Expected newline, got '%.*s'", l, buf);
210 }
211
212 return 1;
213 }
214
215 static int process_special_field(JournalImporter *imp, char *line) {
216 const char *value;
217 char buf[CELLESCAPE_DEFAULT_LENGTH];
218 int r;
219
220 assert(line);
221
222 value = startswith(line, "__CURSOR=");
223 if (value)
224 /* ignore __CURSOR */
225 return 1;
226
227 value = startswith(line, "__REALTIME_TIMESTAMP=");
228 if (value) {
229 uint64_t x;
230
231 r = safe_atou64(value, &x);
232 if (r < 0)
233 return log_warning_errno(r, "Failed to parse __REALTIME_TIMESTAMP '%s': %m",
234 cellescape(buf, sizeof buf, value));
235 else if (!VALID_REALTIME(x)) {
236 log_warning("__REALTIME_TIMESTAMP out of range, ignoring: %"PRIu64, x);
237 return -ERANGE;
238 }
239
240 imp->ts.realtime = x;
241 return 1;
242 }
243
244 value = startswith(line, "__MONOTONIC_TIMESTAMP=");
245 if (value) {
246 uint64_t x;
247
248 r = safe_atou64(value, &x);
249 if (r < 0)
250 return log_warning_errno(r, "Failed to parse __MONOTONIC_TIMESTAMP '%s': %m",
251 cellescape(buf, sizeof buf, value));
252 else if (!VALID_MONOTONIC(x)) {
253 log_warning("__MONOTONIC_TIMESTAMP out of range, ignoring: %"PRIu64, x);
254 return -ERANGE;
255 }
256
257 imp->ts.monotonic = x;
258 return 1;
259 }
260
261 /* Just a single underline, but it needs special treatment too. */
262 value = startswith(line, "_BOOT_ID=");
263 if (value) {
264 r = sd_id128_from_string(value, &imp->boot_id);
265 if (r < 0)
266 return log_warning_errno(r, "Failed to parse _BOOT_ID '%s': %m",
267 cellescape(buf, sizeof buf, value));
268
269 /* store the field in the usual fashion too */
270 return 0;
271 }
272
273 value = startswith(line, "__");
274 if (value) {
275 log_notice("Unknown dunder line __%s, ignoring.", cellescape(buf, sizeof buf, value));
276 return 1;
277 }
278
279 /* no dunder */
280 return 0;
281 }
282
283 int journal_importer_process_data(JournalImporter *imp) {
284 int r;
285
286 switch(imp->state) {
287 case IMPORTER_STATE_LINE: {
288 char *line, *sep;
289 size_t n = 0;
290
291 assert(imp->data_size == 0);
292
293 r = get_line(imp, &line, &n);
294 if (r < 0)
295 return r;
296 if (r == 0) {
297 imp->state = IMPORTER_STATE_EOF;
298 return 0;
299 }
300 assert(n > 0);
301 assert(line[n-1] == '\n');
302
303 if (n == 1) {
304 log_trace("Received empty line, event is ready");
305 return 1;
306 }
307
308 /* MESSAGE=xxx\n
309 or
310 COREDUMP\n
311 LLLLLLLL0011223344...\n
312 */
313 sep = memchr(line, '=', n);
314 if (sep) {
315 /* chomp newline */
316 n--;
317
318 if (!journal_field_valid(line, sep - line, true)) {
319 char buf[64], *t;
320
321 t = strndupa(line, sep - line);
322 log_debug("Ignoring invalid field: \"%s\"",
323 cellescape(buf, sizeof buf, t));
324
325 return 0;
326 }
327
328 line[n] = '\0';
329 r = process_special_field(imp, line);
330 if (r != 0)
331 return r < 0 ? r : 0;
332
333 r = iovw_put(&imp->iovw, line, n);
334 if (r < 0)
335 return r;
336 } else {
337 /* replace \n with = */
338 line[n-1] = '=';
339
340 imp->field_len = n;
341 imp->state = IMPORTER_STATE_DATA_START;
342
343 /* we cannot put the field in iovec until we have all data */
344 }
345
346 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
347
348 return 0; /* continue */
349 }
350
351 case IMPORTER_STATE_DATA_START:
352 assert(imp->data_size == 0);
353
354 r = get_data_size(imp);
355 // log_debug("get_data_size() -> %d", r);
356 if (r < 0)
357 return r;
358 if (r == 0) {
359 imp->state = IMPORTER_STATE_EOF;
360 return 0;
361 }
362
363 imp->state = imp->data_size > 0 ?
364 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
365
366 return 0; /* continue */
367
368 case IMPORTER_STATE_DATA: {
369 void *data;
370 char *field;
371
372 assert(imp->data_size > 0);
373
374 r = get_data_data(imp, &data);
375 // log_debug("get_data_data() -> %d", r);
376 if (r < 0)
377 return r;
378 if (r == 0) {
379 imp->state = IMPORTER_STATE_EOF;
380 return 0;
381 }
382
383 assert(data);
384
385 field = (char*) data - sizeof(uint64_t) - imp->field_len;
386 memmove(field + sizeof(uint64_t), field, imp->field_len);
387
388 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
389 if (r < 0)
390 return r;
391
392 imp->state = IMPORTER_STATE_DATA_FINISH;
393
394 return 0; /* continue */
395 }
396
397 case IMPORTER_STATE_DATA_FINISH:
398 r = get_data_newline(imp);
399 // log_debug("get_data_newline() -> %d", r);
400 if (r < 0)
401 return r;
402 if (r == 0) {
403 imp->state = IMPORTER_STATE_EOF;
404 return 0;
405 }
406
407 imp->data_size = 0;
408 imp->state = IMPORTER_STATE_LINE;
409
410 return 0; /* continue */
411 default:
412 assert_not_reached("wtf?");
413 }
414 }
415
416 int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
417 assert(imp);
418 assert(imp->state != IMPORTER_STATE_EOF);
419
420 if (!realloc_buffer(imp, imp->filled + size))
421 return log_error_errno(SYNTHETIC_ERRNO(ENOMEM),
422 "Failed to store received data of size %zu "
423 "(in addition to existing %zu bytes with %zu filled): %s",
424 size, imp->size, imp->filled,
425 strerror_safe(ENOMEM));
426
427 memcpy(imp->buf + imp->filled, data, size);
428 imp->filled += size;
429
430 return 0;
431 }
432
433 void journal_importer_drop_iovw(JournalImporter *imp) {
434 size_t remain, target;
435
436 /* This function drops processed data that along with the iovw that points at it */
437
438 iovw_free_contents(&imp->iovw, false);
439
440 /* possibly reset buffer position */
441 remain = imp->filled - imp->offset;
442
443 if (remain == 0) /* no brainer */
444 imp->offset = imp->scanned = imp->filled = 0;
445 else if (imp->offset > imp->size - imp->filled &&
446 imp->offset > remain) {
447 memcpy(imp->buf, imp->buf + imp->offset, remain);
448 imp->offset = imp->scanned = 0;
449 imp->filled = remain;
450 }
451
452 target = imp->size;
453 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
454 target /= 2;
455 if (target < imp->size) {
456 char *tmp;
457
458 tmp = realloc(imp->buf, target);
459 if (!tmp)
460 log_warning("Failed to reallocate buffer to (smaller) size %zu",
461 target);
462 else {
463 log_debug("Reallocated buffer from %zu to %zu bytes",
464 imp->size, target);
465 imp->buf = tmp;
466 imp->size = target;
467 }
468 }
469 }
470
471 bool journal_importer_eof(const JournalImporter *imp) {
472 return imp->state == IMPORTER_STATE_EOF;
473 }