]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/basic/journal-importer.c
journald: use unaligned_read instead of memcpy
[thirdparty/systemd.git] / src / basic / journal-importer.c
CommitLineData
b18453ed
ZJS
1/***
2 This file is part of systemd.
3
4 Copyright 2014 Zbigniew Jędrzejewski-Szmek
5
6 systemd is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 systemd is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with systemd; If not, see <http://www.gnu.org/licenses/>.
18***/
19
20#include <unistd.h>
21
22#include "alloc-util.h"
23#include "journal-importer.h"
24#include "fd-util.h"
25#include "parse-util.h"
26#include "string-util.h"
f652c62d 27#include "unaligned.h"
b18453ed
ZJS
28
29enum {
30 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
31 IMPORTER_STATE_DATA_START, /* reading binary data header */
32 IMPORTER_STATE_DATA, /* reading binary data */
33 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
34 IMPORTER_STATE_EOF, /* done */
35};
36
37static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
38 if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
39 return log_oom();
40
41 iovw->iovec[iovw->count++] = (struct iovec) {data, len};
42 return 0;
43}
44
45static void iovw_free_contents(struct iovec_wrapper *iovw) {
46 iovw->iovec = mfree(iovw->iovec);
47 iovw->size_bytes = iovw->count = 0;
48}
49
50static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
51 size_t i;
52
53 for (i = 0; i < iovw->count; i++)
54 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
55}
56
57size_t iovw_size(struct iovec_wrapper *iovw) {
58 size_t n = 0, i;
59
60 for (i = 0; i < iovw->count; i++)
61 n += iovw->iovec[i].iov_len;
62
63 return n;
64}
65
66void journal_importer_cleanup(JournalImporter *imp) {
67 if (imp->fd >= 0 && !imp->passive_fd) {
68 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
69 safe_close(imp->fd);
70 }
71
72 free(imp->buf);
73 iovw_free_contents(&imp->iovw);
74}
75
76static char* realloc_buffer(JournalImporter *imp, size_t size) {
77 char *b, *old = imp->buf;
78
79 b = GREEDY_REALLOC(imp->buf, imp->size, size);
80 if (!b)
81 return NULL;
82
83 iovw_rebase(&imp->iovw, old, imp->buf);
84
85 return b;
86}
87
88static int get_line(JournalImporter *imp, char **line, size_t *size) {
89 ssize_t n;
90 char *c = NULL;
91
92 assert(imp);
93 assert(imp->state == IMPORTER_STATE_LINE);
94 assert(imp->offset <= imp->filled);
95 assert(imp->filled <= imp->size);
96 assert(imp->buf == NULL || imp->size > 0);
97 assert(imp->fd >= 0);
98
99 for (;;) {
100 if (imp->buf) {
101 size_t start = MAX(imp->scanned, imp->offset);
102
103 c = memchr(imp->buf + start, '\n',
104 imp->filled - start);
105 if (c != NULL)
106 break;
107 }
108
109 imp->scanned = imp->filled;
110 if (imp->scanned >= DATA_SIZE_MAX) {
111 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
112 return -E2BIG;
113 }
114
115 if (imp->passive_fd)
116 /* we have to wait for some data to come to us */
117 return -EAGAIN;
118
119 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
120 we reallocate it, we'll increase the size at least a bit. */
121 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
122 if (imp->size - imp->filled < LINE_CHUNK &&
123 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
124 return log_oom();
125
126 assert(imp->buf);
127 assert(imp->size - imp->filled >= LINE_CHUNK ||
128 imp->size == ENTRY_SIZE_MAX);
129
130 n = read(imp->fd,
131 imp->buf + imp->filled,
132 imp->size - imp->filled);
133 if (n < 0) {
134 if (errno != EAGAIN)
135 log_error_errno(errno, "read(%d, ..., %zu): %m",
136 imp->fd,
137 imp->size - imp->filled);
138 return -errno;
139 } else if (n == 0)
140 return 0;
141
142 imp->filled += n;
143 }
144
145 *line = imp->buf + imp->offset;
146 *size = c + 1 - imp->buf - imp->offset;
147 imp->offset += *size;
148
149 return 1;
150}
151
152static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
153
154 assert(imp);
155 assert(imp->state == IMPORTER_STATE_DATA_START ||
156 imp->state == IMPORTER_STATE_DATA ||
157 imp->state == IMPORTER_STATE_DATA_FINISH);
158 assert(size <= DATA_SIZE_MAX);
159 assert(imp->offset <= imp->filled);
160 assert(imp->filled <= imp->size);
161 assert(imp->buf != NULL || imp->size == 0);
162 assert(imp->buf == NULL || imp->size > 0);
163 assert(imp->fd >= 0);
164 assert(data);
165
166 while (imp->filled - imp->offset < size) {
167 int n;
168
169 if (imp->passive_fd)
170 /* we have to wait for some data to come to us */
171 return -EAGAIN;
172
173 if (!realloc_buffer(imp, imp->offset + size))
174 return log_oom();
175
176 n = read(imp->fd, imp->buf + imp->filled,
177 imp->size - imp->filled);
178 if (n < 0) {
179 if (errno != EAGAIN)
180 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
181 imp->size - imp->filled);
182 return -errno;
183 } else if (n == 0)
184 return 0;
185
186 imp->filled += n;
187 }
188
189 *data = imp->buf + imp->offset;
190 imp->offset += size;
191
192 return 1;
193}
194
195static int get_data_size(JournalImporter *imp) {
196 int r;
197 void *data;
198
199 assert(imp);
200 assert(imp->state == IMPORTER_STATE_DATA_START);
201 assert(imp->data_size == 0);
202
203 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
204 if (r <= 0)
205 return r;
206
f652c62d 207 imp->data_size = unaligned_read_le64(data);
b18453ed
ZJS
208 if (imp->data_size > DATA_SIZE_MAX) {
209 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
210 imp->data_size, DATA_SIZE_MAX);
211 return -EINVAL;
212 }
213 if (imp->data_size == 0)
214 log_warning("Binary field with zero length");
215
216 return 1;
217}
218
219static int get_data_data(JournalImporter *imp, void **data) {
220 int r;
221
222 assert(imp);
223 assert(data);
224 assert(imp->state == IMPORTER_STATE_DATA);
225
226 r = fill_fixed_size(imp, data, imp->data_size);
227 if (r <= 0)
228 return r;
229
230 return 1;
231}
232
233static int get_data_newline(JournalImporter *imp) {
234 int r;
235 char *data;
236
237 assert(imp);
238 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
239
240 r = fill_fixed_size(imp, (void**) &data, 1);
241 if (r <= 0)
242 return r;
243
244 assert(data);
245 if (*data != '\n') {
246 log_error("expected newline, got '%c'", *data);
247 return -EINVAL;
248 }
249
250 return 1;
251}
252
253static int process_dunder(JournalImporter *imp, char *line, size_t n) {
254 const char *timestamp;
255 int r;
256
257 assert(line);
258 assert(n > 0);
259 assert(line[n-1] == '\n');
260
261 /* XXX: is it worth to support timestamps in extended format?
262 * We don't produce them, but who knows... */
263
264 timestamp = startswith(line, "__CURSOR=");
265 if (timestamp)
266 /* ignore __CURSOR */
267 return 1;
268
269 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
270 if (timestamp) {
271 long long unsigned x;
272 line[n-1] = '\0';
273 r = safe_atollu(timestamp, &x);
274 if (r < 0)
275 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
276 else
277 imp->ts.realtime = x;
278 return r < 0 ? r : 1;
279 }
280
281 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
282 if (timestamp) {
283 long long unsigned x;
284 line[n-1] = '\0';
285 r = safe_atollu(timestamp, &x);
286 if (r < 0)
287 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
288 else
289 imp->ts.monotonic = x;
290 return r < 0 ? r : 1;
291 }
292
293 timestamp = startswith(line, "__");
294 if (timestamp) {
295 log_notice("Unknown dunder line %s", line);
296 return 1;
297 }
298
299 /* no dunder */
300 return 0;
301}
302
303int journal_importer_process_data(JournalImporter *imp) {
304 int r;
305
306 switch(imp->state) {
307 case IMPORTER_STATE_LINE: {
308 char *line, *sep;
309 size_t n = 0;
310
311 assert(imp->data_size == 0);
312
313 r = get_line(imp, &line, &n);
314 if (r < 0)
315 return r;
316 if (r == 0) {
317 imp->state = IMPORTER_STATE_EOF;
d74dc4f2 318 return 0;
b18453ed
ZJS
319 }
320 assert(n > 0);
321 assert(line[n-1] == '\n');
322
323 if (n == 1) {
324 log_trace("Received empty line, event is ready");
325 return 1;
326 }
327
328 r = process_dunder(imp, line, n);
329 if (r != 0)
330 return r < 0 ? r : 0;
331
332 /* MESSAGE=xxx\n
333 or
334 COREDUMP\n
335 LLLLLLLL0011223344...\n
336 */
337 sep = memchr(line, '=', n);
338 if (sep) {
339 /* chomp newline */
340 n--;
341
342 r = iovw_put(&imp->iovw, line, n);
343 if (r < 0)
344 return r;
345 } else {
346 /* replace \n with = */
347 line[n-1] = '=';
348
349 imp->field_len = n;
350 imp->state = IMPORTER_STATE_DATA_START;
351
352 /* we cannot put the field in iovec until we have all data */
353 }
354
355 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
356
357 return 0; /* continue */
358 }
359
360 case IMPORTER_STATE_DATA_START:
361 assert(imp->data_size == 0);
362
363 r = get_data_size(imp);
364 // log_debug("get_data_size() -> %d", r);
365 if (r < 0)
366 return r;
367 if (r == 0) {
368 imp->state = IMPORTER_STATE_EOF;
369 return 0;
370 }
371
372 imp->state = imp->data_size > 0 ?
373 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
374
375 return 0; /* continue */
376
377 case IMPORTER_STATE_DATA: {
378 void *data;
379 char *field;
380
381 assert(imp->data_size > 0);
382
383 r = get_data_data(imp, &data);
384 // log_debug("get_data_data() -> %d", r);
385 if (r < 0)
386 return r;
387 if (r == 0) {
388 imp->state = IMPORTER_STATE_EOF;
389 return 0;
390 }
391
392 assert(data);
393
394 field = (char*) data - sizeof(uint64_t) - imp->field_len;
395 memmove(field + sizeof(uint64_t), field, imp->field_len);
396
397 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
398 if (r < 0)
399 return r;
400
401 imp->state = IMPORTER_STATE_DATA_FINISH;
402
403 return 0; /* continue */
404 }
405
406 case IMPORTER_STATE_DATA_FINISH:
407 r = get_data_newline(imp);
408 // log_debug("get_data_newline() -> %d", r);
409 if (r < 0)
410 return r;
411 if (r == 0) {
412 imp->state = IMPORTER_STATE_EOF;
413 return 0;
414 }
415
416 imp->data_size = 0;
417 imp->state = IMPORTER_STATE_LINE;
418
419 return 0; /* continue */
420 default:
421 assert_not_reached("wtf?");
422 }
423}
424
425int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
426 assert(imp);
427 assert(imp->state != IMPORTER_STATE_EOF);
428
429 if (!realloc_buffer(imp, imp->filled + size)) {
430 log_error("Failed to store received data of size %zu "
431 "(in addition to existing %zu bytes with %zu filled): %s",
432 size, imp->size, imp->filled, strerror(ENOMEM));
433 return -ENOMEM;
434 }
435
436 memcpy(imp->buf + imp->filled, data, size);
437 imp->filled += size;
438
439 return 0;
440}
441
442void journal_importer_drop_iovw(JournalImporter *imp) {
443 size_t remain, target;
444
445 /* This function drops processed data that along with the iovw that points at it */
446
447 iovw_free_contents(&imp->iovw);
448
449 /* possibly reset buffer position */
450 remain = imp->filled - imp->offset;
451
452 if (remain == 0) /* no brainer */
453 imp->offset = imp->scanned = imp->filled = 0;
454 else if (imp->offset > imp->size - imp->filled &&
455 imp->offset > remain) {
456 memcpy(imp->buf, imp->buf + imp->offset, remain);
457 imp->offset = imp->scanned = 0;
458 imp->filled = remain;
459 }
460
461 target = imp->size;
462 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
463 target /= 2;
464 if (target < imp->size) {
465 char *tmp;
466
467 tmp = realloc(imp->buf, target);
468 if (!tmp)
469 log_warning("Failed to reallocate buffer to (smaller) size %zu",
470 target);
471 else {
472 log_debug("Reallocated buffer from %zu to %zu bytes",
473 imp->size, target);
474 imp->buf = tmp;
475 imp->size = target;
476 }
477 }
478}
479
480bool journal_importer_eof(const JournalImporter *imp) {
481 return imp->state == IMPORTER_STATE_EOF;
482}