]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/basic/journal-importer.c
Merge pull request #6924 from andir/vrf-dhcpv4
[thirdparty/systemd.git] / src / basic / journal-importer.c
1 /***
2 This file is part of systemd.
3
4 Copyright 2014 Zbigniew Jędrzejewski-Szmek
5
6 systemd is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 systemd is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with systemd; If not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #include <unistd.h>
21
22 #include "alloc-util.h"
23 #include "fd-util.h"
24 #include "io-util.h"
25 #include "journal-importer.h"
26 #include "parse-util.h"
27 #include "string-util.h"
28 #include "unaligned.h"
29
30 enum {
31 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
32 IMPORTER_STATE_DATA_START, /* reading binary data header */
33 IMPORTER_STATE_DATA, /* reading binary data */
34 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
35 IMPORTER_STATE_EOF, /* done */
36 };
37
38 static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
39 if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
40 return log_oom();
41
42 iovw->iovec[iovw->count++] = IOVEC_MAKE(data, len);
43 return 0;
44 }
45
46 static void iovw_free_contents(struct iovec_wrapper *iovw) {
47 iovw->iovec = mfree(iovw->iovec);
48 iovw->size_bytes = iovw->count = 0;
49 }
50
51 static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
52 size_t i;
53
54 for (i = 0; i < iovw->count; i++)
55 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
56 }
57
58 size_t iovw_size(struct iovec_wrapper *iovw) {
59 size_t n = 0, i;
60
61 for (i = 0; i < iovw->count; i++)
62 n += iovw->iovec[i].iov_len;
63
64 return n;
65 }
66
67 void journal_importer_cleanup(JournalImporter *imp) {
68 if (imp->fd >= 0 && !imp->passive_fd) {
69 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
70 safe_close(imp->fd);
71 }
72
73 free(imp->name);
74 free(imp->buf);
75 iovw_free_contents(&imp->iovw);
76 }
77
78 static char* realloc_buffer(JournalImporter *imp, size_t size) {
79 char *b, *old = imp->buf;
80
81 b = GREEDY_REALLOC(imp->buf, imp->size, size);
82 if (!b)
83 return NULL;
84
85 iovw_rebase(&imp->iovw, old, imp->buf);
86
87 return b;
88 }
89
90 static int get_line(JournalImporter *imp, char **line, size_t *size) {
91 ssize_t n;
92 char *c = NULL;
93
94 assert(imp);
95 assert(imp->state == IMPORTER_STATE_LINE);
96 assert(imp->offset <= imp->filled);
97 assert(imp->filled <= imp->size);
98 assert(imp->buf == NULL || imp->size > 0);
99 assert(imp->fd >= 0);
100
101 for (;;) {
102 if (imp->buf) {
103 size_t start = MAX(imp->scanned, imp->offset);
104
105 c = memchr(imp->buf + start, '\n',
106 imp->filled - start);
107 if (c != NULL)
108 break;
109 }
110
111 imp->scanned = imp->filled;
112 if (imp->scanned >= DATA_SIZE_MAX) {
113 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
114 return -E2BIG;
115 }
116
117 if (imp->passive_fd)
118 /* we have to wait for some data to come to us */
119 return -EAGAIN;
120
121 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
122 we reallocate it, we'll increase the size at least a bit. */
123 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
124 if (imp->size - imp->filled < LINE_CHUNK &&
125 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
126 return log_oom();
127
128 assert(imp->buf);
129 assert(imp->size - imp->filled >= LINE_CHUNK ||
130 imp->size == ENTRY_SIZE_MAX);
131
132 n = read(imp->fd,
133 imp->buf + imp->filled,
134 imp->size - imp->filled);
135 if (n < 0) {
136 if (errno != EAGAIN)
137 log_error_errno(errno, "read(%d, ..., %zu): %m",
138 imp->fd,
139 imp->size - imp->filled);
140 return -errno;
141 } else if (n == 0)
142 return 0;
143
144 imp->filled += n;
145 }
146
147 *line = imp->buf + imp->offset;
148 *size = c + 1 - imp->buf - imp->offset;
149 imp->offset += *size;
150
151 return 1;
152 }
153
154 static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
155
156 assert(imp);
157 assert(imp->state == IMPORTER_STATE_DATA_START ||
158 imp->state == IMPORTER_STATE_DATA ||
159 imp->state == IMPORTER_STATE_DATA_FINISH);
160 assert(size <= DATA_SIZE_MAX);
161 assert(imp->offset <= imp->filled);
162 assert(imp->filled <= imp->size);
163 assert(imp->buf != NULL || imp->size == 0);
164 assert(imp->buf == NULL || imp->size > 0);
165 assert(imp->fd >= 0);
166 assert(data);
167
168 while (imp->filled - imp->offset < size) {
169 int n;
170
171 if (imp->passive_fd)
172 /* we have to wait for some data to come to us */
173 return -EAGAIN;
174
175 if (!realloc_buffer(imp, imp->offset + size))
176 return log_oom();
177
178 n = read(imp->fd, imp->buf + imp->filled,
179 imp->size - imp->filled);
180 if (n < 0) {
181 if (errno != EAGAIN)
182 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
183 imp->size - imp->filled);
184 return -errno;
185 } else if (n == 0)
186 return 0;
187
188 imp->filled += n;
189 }
190
191 *data = imp->buf + imp->offset;
192 imp->offset += size;
193
194 return 1;
195 }
196
197 static int get_data_size(JournalImporter *imp) {
198 int r;
199 void *data;
200
201 assert(imp);
202 assert(imp->state == IMPORTER_STATE_DATA_START);
203 assert(imp->data_size == 0);
204
205 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
206 if (r <= 0)
207 return r;
208
209 imp->data_size = unaligned_read_le64(data);
210 if (imp->data_size > DATA_SIZE_MAX) {
211 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
212 imp->data_size, DATA_SIZE_MAX);
213 return -EINVAL;
214 }
215 if (imp->data_size == 0)
216 log_warning("Binary field with zero length");
217
218 return 1;
219 }
220
221 static int get_data_data(JournalImporter *imp, void **data) {
222 int r;
223
224 assert(imp);
225 assert(data);
226 assert(imp->state == IMPORTER_STATE_DATA);
227
228 r = fill_fixed_size(imp, data, imp->data_size);
229 if (r <= 0)
230 return r;
231
232 return 1;
233 }
234
235 static int get_data_newline(JournalImporter *imp) {
236 int r;
237 char *data;
238
239 assert(imp);
240 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
241
242 r = fill_fixed_size(imp, (void**) &data, 1);
243 if (r <= 0)
244 return r;
245
246 assert(data);
247 if (*data != '\n') {
248 log_error("expected newline, got '%c'", *data);
249 return -EINVAL;
250 }
251
252 return 1;
253 }
254
255 static int process_dunder(JournalImporter *imp, char *line, size_t n) {
256 const char *timestamp;
257 int r;
258
259 assert(line);
260 assert(n > 0);
261 assert(line[n-1] == '\n');
262
263 /* XXX: is it worth to support timestamps in extended format?
264 * We don't produce them, but who knows... */
265
266 timestamp = startswith(line, "__CURSOR=");
267 if (timestamp)
268 /* ignore __CURSOR */
269 return 1;
270
271 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
272 if (timestamp) {
273 long long unsigned x;
274 line[n-1] = '\0';
275 r = safe_atollu(timestamp, &x);
276 if (r < 0)
277 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
278 else
279 imp->ts.realtime = x;
280 return r < 0 ? r : 1;
281 }
282
283 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
284 if (timestamp) {
285 long long unsigned x;
286 line[n-1] = '\0';
287 r = safe_atollu(timestamp, &x);
288 if (r < 0)
289 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
290 else
291 imp->ts.monotonic = x;
292 return r < 0 ? r : 1;
293 }
294
295 timestamp = startswith(line, "__");
296 if (timestamp) {
297 log_notice("Unknown dunder line %s", line);
298 return 1;
299 }
300
301 /* no dunder */
302 return 0;
303 }
304
305 int journal_importer_process_data(JournalImporter *imp) {
306 int r;
307
308 switch(imp->state) {
309 case IMPORTER_STATE_LINE: {
310 char *line, *sep;
311 size_t n = 0;
312
313 assert(imp->data_size == 0);
314
315 r = get_line(imp, &line, &n);
316 if (r < 0)
317 return r;
318 if (r == 0) {
319 imp->state = IMPORTER_STATE_EOF;
320 return 0;
321 }
322 assert(n > 0);
323 assert(line[n-1] == '\n');
324
325 if (n == 1) {
326 log_trace("Received empty line, event is ready");
327 return 1;
328 }
329
330 r = process_dunder(imp, line, n);
331 if (r != 0)
332 return r < 0 ? r : 0;
333
334 /* MESSAGE=xxx\n
335 or
336 COREDUMP\n
337 LLLLLLLL0011223344...\n
338 */
339 sep = memchr(line, '=', n);
340 if (sep) {
341 /* chomp newline */
342 n--;
343
344 r = iovw_put(&imp->iovw, line, n);
345 if (r < 0)
346 return r;
347 } else {
348 /* replace \n with = */
349 line[n-1] = '=';
350
351 imp->field_len = n;
352 imp->state = IMPORTER_STATE_DATA_START;
353
354 /* we cannot put the field in iovec until we have all data */
355 }
356
357 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
358
359 return 0; /* continue */
360 }
361
362 case IMPORTER_STATE_DATA_START:
363 assert(imp->data_size == 0);
364
365 r = get_data_size(imp);
366 // log_debug("get_data_size() -> %d", r);
367 if (r < 0)
368 return r;
369 if (r == 0) {
370 imp->state = IMPORTER_STATE_EOF;
371 return 0;
372 }
373
374 imp->state = imp->data_size > 0 ?
375 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
376
377 return 0; /* continue */
378
379 case IMPORTER_STATE_DATA: {
380 void *data;
381 char *field;
382
383 assert(imp->data_size > 0);
384
385 r = get_data_data(imp, &data);
386 // log_debug("get_data_data() -> %d", r);
387 if (r < 0)
388 return r;
389 if (r == 0) {
390 imp->state = IMPORTER_STATE_EOF;
391 return 0;
392 }
393
394 assert(data);
395
396 field = (char*) data - sizeof(uint64_t) - imp->field_len;
397 memmove(field + sizeof(uint64_t), field, imp->field_len);
398
399 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
400 if (r < 0)
401 return r;
402
403 imp->state = IMPORTER_STATE_DATA_FINISH;
404
405 return 0; /* continue */
406 }
407
408 case IMPORTER_STATE_DATA_FINISH:
409 r = get_data_newline(imp);
410 // log_debug("get_data_newline() -> %d", r);
411 if (r < 0)
412 return r;
413 if (r == 0) {
414 imp->state = IMPORTER_STATE_EOF;
415 return 0;
416 }
417
418 imp->data_size = 0;
419 imp->state = IMPORTER_STATE_LINE;
420
421 return 0; /* continue */
422 default:
423 assert_not_reached("wtf?");
424 }
425 }
426
427 int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
428 assert(imp);
429 assert(imp->state != IMPORTER_STATE_EOF);
430
431 if (!realloc_buffer(imp, imp->filled + size)) {
432 log_error("Failed to store received data of size %zu "
433 "(in addition to existing %zu bytes with %zu filled): %s",
434 size, imp->size, imp->filled, strerror(ENOMEM));
435 return -ENOMEM;
436 }
437
438 memcpy(imp->buf + imp->filled, data, size);
439 imp->filled += size;
440
441 return 0;
442 }
443
444 void journal_importer_drop_iovw(JournalImporter *imp) {
445 size_t remain, target;
446
447 /* This function drops processed data that along with the iovw that points at it */
448
449 iovw_free_contents(&imp->iovw);
450
451 /* possibly reset buffer position */
452 remain = imp->filled - imp->offset;
453
454 if (remain == 0) /* no brainer */
455 imp->offset = imp->scanned = imp->filled = 0;
456 else if (imp->offset > imp->size - imp->filled &&
457 imp->offset > remain) {
458 memcpy(imp->buf, imp->buf + imp->offset, remain);
459 imp->offset = imp->scanned = 0;
460 imp->filled = remain;
461 }
462
463 target = imp->size;
464 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
465 target /= 2;
466 if (target < imp->size) {
467 char *tmp;
468
469 tmp = realloc(imp->buf, target);
470 if (!tmp)
471 log_warning("Failed to reallocate buffer to (smaller) size %zu",
472 target);
473 else {
474 log_debug("Reallocated buffer from %zu to %zu bytes",
475 imp->size, target);
476 imp->buf = tmp;
477 imp->size = target;
478 }
479 }
480 }
481
482 bool journal_importer_eof(const JournalImporter *imp) {
483 return imp->state == IMPORTER_STATE_EOF;
484 }