]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/basic/journal-importer.c
Merge pull request #5531 from yuwata/mdns
[thirdparty/systemd.git] / src / basic / journal-importer.c
CommitLineData
b18453ed
ZJS
1/***
2 This file is part of systemd.
3
4 Copyright 2014 Zbigniew Jędrzejewski-Szmek
5
6 systemd is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 systemd is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with systemd; If not, see <http://www.gnu.org/licenses/>.
18***/
19
20#include <unistd.h>
21
22#include "alloc-util.h"
23#include "journal-importer.h"
24#include "fd-util.h"
25#include "parse-util.h"
26#include "string-util.h"
27
28enum {
29 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
30 IMPORTER_STATE_DATA_START, /* reading binary data header */
31 IMPORTER_STATE_DATA, /* reading binary data */
32 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
33 IMPORTER_STATE_EOF, /* done */
34};
35
36static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
37 if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
38 return log_oom();
39
40 iovw->iovec[iovw->count++] = (struct iovec) {data, len};
41 return 0;
42}
43
44static void iovw_free_contents(struct iovec_wrapper *iovw) {
45 iovw->iovec = mfree(iovw->iovec);
46 iovw->size_bytes = iovw->count = 0;
47}
48
49static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
50 size_t i;
51
52 for (i = 0; i < iovw->count; i++)
53 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
54}
55
56size_t iovw_size(struct iovec_wrapper *iovw) {
57 size_t n = 0, i;
58
59 for (i = 0; i < iovw->count; i++)
60 n += iovw->iovec[i].iov_len;
61
62 return n;
63}
64
65void journal_importer_cleanup(JournalImporter *imp) {
66 if (imp->fd >= 0 && !imp->passive_fd) {
67 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
68 safe_close(imp->fd);
69 }
70
71 free(imp->buf);
72 iovw_free_contents(&imp->iovw);
73}
74
75static char* realloc_buffer(JournalImporter *imp, size_t size) {
76 char *b, *old = imp->buf;
77
78 b = GREEDY_REALLOC(imp->buf, imp->size, size);
79 if (!b)
80 return NULL;
81
82 iovw_rebase(&imp->iovw, old, imp->buf);
83
84 return b;
85}
86
87static int get_line(JournalImporter *imp, char **line, size_t *size) {
88 ssize_t n;
89 char *c = NULL;
90
91 assert(imp);
92 assert(imp->state == IMPORTER_STATE_LINE);
93 assert(imp->offset <= imp->filled);
94 assert(imp->filled <= imp->size);
95 assert(imp->buf == NULL || imp->size > 0);
96 assert(imp->fd >= 0);
97
98 for (;;) {
99 if (imp->buf) {
100 size_t start = MAX(imp->scanned, imp->offset);
101
102 c = memchr(imp->buf + start, '\n',
103 imp->filled - start);
104 if (c != NULL)
105 break;
106 }
107
108 imp->scanned = imp->filled;
109 if (imp->scanned >= DATA_SIZE_MAX) {
110 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
111 return -E2BIG;
112 }
113
114 if (imp->passive_fd)
115 /* we have to wait for some data to come to us */
116 return -EAGAIN;
117
118 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
119 we reallocate it, we'll increase the size at least a bit. */
120 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
121 if (imp->size - imp->filled < LINE_CHUNK &&
122 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
123 return log_oom();
124
125 assert(imp->buf);
126 assert(imp->size - imp->filled >= LINE_CHUNK ||
127 imp->size == ENTRY_SIZE_MAX);
128
129 n = read(imp->fd,
130 imp->buf + imp->filled,
131 imp->size - imp->filled);
132 if (n < 0) {
133 if (errno != EAGAIN)
134 log_error_errno(errno, "read(%d, ..., %zu): %m",
135 imp->fd,
136 imp->size - imp->filled);
137 return -errno;
138 } else if (n == 0)
139 return 0;
140
141 imp->filled += n;
142 }
143
144 *line = imp->buf + imp->offset;
145 *size = c + 1 - imp->buf - imp->offset;
146 imp->offset += *size;
147
148 return 1;
149}
150
151static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
152
153 assert(imp);
154 assert(imp->state == IMPORTER_STATE_DATA_START ||
155 imp->state == IMPORTER_STATE_DATA ||
156 imp->state == IMPORTER_STATE_DATA_FINISH);
157 assert(size <= DATA_SIZE_MAX);
158 assert(imp->offset <= imp->filled);
159 assert(imp->filled <= imp->size);
160 assert(imp->buf != NULL || imp->size == 0);
161 assert(imp->buf == NULL || imp->size > 0);
162 assert(imp->fd >= 0);
163 assert(data);
164
165 while (imp->filled - imp->offset < size) {
166 int n;
167
168 if (imp->passive_fd)
169 /* we have to wait for some data to come to us */
170 return -EAGAIN;
171
172 if (!realloc_buffer(imp, imp->offset + size))
173 return log_oom();
174
175 n = read(imp->fd, imp->buf + imp->filled,
176 imp->size - imp->filled);
177 if (n < 0) {
178 if (errno != EAGAIN)
179 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
180 imp->size - imp->filled);
181 return -errno;
182 } else if (n == 0)
183 return 0;
184
185 imp->filled += n;
186 }
187
188 *data = imp->buf + imp->offset;
189 imp->offset += size;
190
191 return 1;
192}
193
194static int get_data_size(JournalImporter *imp) {
195 int r;
196 void *data;
197
198 assert(imp);
199 assert(imp->state == IMPORTER_STATE_DATA_START);
200 assert(imp->data_size == 0);
201
202 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
203 if (r <= 0)
204 return r;
205
206 imp->data_size = le64toh( *(uint64_t *) data );
207 if (imp->data_size > DATA_SIZE_MAX) {
208 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
209 imp->data_size, DATA_SIZE_MAX);
210 return -EINVAL;
211 }
212 if (imp->data_size == 0)
213 log_warning("Binary field with zero length");
214
215 return 1;
216}
217
218static int get_data_data(JournalImporter *imp, void **data) {
219 int r;
220
221 assert(imp);
222 assert(data);
223 assert(imp->state == IMPORTER_STATE_DATA);
224
225 r = fill_fixed_size(imp, data, imp->data_size);
226 if (r <= 0)
227 return r;
228
229 return 1;
230}
231
232static int get_data_newline(JournalImporter *imp) {
233 int r;
234 char *data;
235
236 assert(imp);
237 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
238
239 r = fill_fixed_size(imp, (void**) &data, 1);
240 if (r <= 0)
241 return r;
242
243 assert(data);
244 if (*data != '\n') {
245 log_error("expected newline, got '%c'", *data);
246 return -EINVAL;
247 }
248
249 return 1;
250}
251
252static int process_dunder(JournalImporter *imp, char *line, size_t n) {
253 const char *timestamp;
254 int r;
255
256 assert(line);
257 assert(n > 0);
258 assert(line[n-1] == '\n');
259
260 /* XXX: is it worth to support timestamps in extended format?
261 * We don't produce them, but who knows... */
262
263 timestamp = startswith(line, "__CURSOR=");
264 if (timestamp)
265 /* ignore __CURSOR */
266 return 1;
267
268 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
269 if (timestamp) {
270 long long unsigned x;
271 line[n-1] = '\0';
272 r = safe_atollu(timestamp, &x);
273 if (r < 0)
274 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
275 else
276 imp->ts.realtime = x;
277 return r < 0 ? r : 1;
278 }
279
280 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
281 if (timestamp) {
282 long long unsigned x;
283 line[n-1] = '\0';
284 r = safe_atollu(timestamp, &x);
285 if (r < 0)
286 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
287 else
288 imp->ts.monotonic = x;
289 return r < 0 ? r : 1;
290 }
291
292 timestamp = startswith(line, "__");
293 if (timestamp) {
294 log_notice("Unknown dunder line %s", line);
295 return 1;
296 }
297
298 /* no dunder */
299 return 0;
300}
301
302int journal_importer_process_data(JournalImporter *imp) {
303 int r;
304
305 switch(imp->state) {
306 case IMPORTER_STATE_LINE: {
307 char *line, *sep;
308 size_t n = 0;
309
310 assert(imp->data_size == 0);
311
312 r = get_line(imp, &line, &n);
313 if (r < 0)
314 return r;
315 if (r == 0) {
316 imp->state = IMPORTER_STATE_EOF;
d74dc4f2 317 return 0;
b18453ed
ZJS
318 }
319 assert(n > 0);
320 assert(line[n-1] == '\n');
321
322 if (n == 1) {
323 log_trace("Received empty line, event is ready");
324 return 1;
325 }
326
327 r = process_dunder(imp, line, n);
328 if (r != 0)
329 return r < 0 ? r : 0;
330
331 /* MESSAGE=xxx\n
332 or
333 COREDUMP\n
334 LLLLLLLL0011223344...\n
335 */
336 sep = memchr(line, '=', n);
337 if (sep) {
338 /* chomp newline */
339 n--;
340
341 r = iovw_put(&imp->iovw, line, n);
342 if (r < 0)
343 return r;
344 } else {
345 /* replace \n with = */
346 line[n-1] = '=';
347
348 imp->field_len = n;
349 imp->state = IMPORTER_STATE_DATA_START;
350
351 /* we cannot put the field in iovec until we have all data */
352 }
353
354 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
355
356 return 0; /* continue */
357 }
358
359 case IMPORTER_STATE_DATA_START:
360 assert(imp->data_size == 0);
361
362 r = get_data_size(imp);
363 // log_debug("get_data_size() -> %d", r);
364 if (r < 0)
365 return r;
366 if (r == 0) {
367 imp->state = IMPORTER_STATE_EOF;
368 return 0;
369 }
370
371 imp->state = imp->data_size > 0 ?
372 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
373
374 return 0; /* continue */
375
376 case IMPORTER_STATE_DATA: {
377 void *data;
378 char *field;
379
380 assert(imp->data_size > 0);
381
382 r = get_data_data(imp, &data);
383 // log_debug("get_data_data() -> %d", r);
384 if (r < 0)
385 return r;
386 if (r == 0) {
387 imp->state = IMPORTER_STATE_EOF;
388 return 0;
389 }
390
391 assert(data);
392
393 field = (char*) data - sizeof(uint64_t) - imp->field_len;
394 memmove(field + sizeof(uint64_t), field, imp->field_len);
395
396 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
397 if (r < 0)
398 return r;
399
400 imp->state = IMPORTER_STATE_DATA_FINISH;
401
402 return 0; /* continue */
403 }
404
405 case IMPORTER_STATE_DATA_FINISH:
406 r = get_data_newline(imp);
407 // log_debug("get_data_newline() -> %d", r);
408 if (r < 0)
409 return r;
410 if (r == 0) {
411 imp->state = IMPORTER_STATE_EOF;
412 return 0;
413 }
414
415 imp->data_size = 0;
416 imp->state = IMPORTER_STATE_LINE;
417
418 return 0; /* continue */
419 default:
420 assert_not_reached("wtf?");
421 }
422}
423
424int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
425 assert(imp);
426 assert(imp->state != IMPORTER_STATE_EOF);
427
428 if (!realloc_buffer(imp, imp->filled + size)) {
429 log_error("Failed to store received data of size %zu "
430 "(in addition to existing %zu bytes with %zu filled): %s",
431 size, imp->size, imp->filled, strerror(ENOMEM));
432 return -ENOMEM;
433 }
434
435 memcpy(imp->buf + imp->filled, data, size);
436 imp->filled += size;
437
438 return 0;
439}
440
441void journal_importer_drop_iovw(JournalImporter *imp) {
442 size_t remain, target;
443
444 /* This function drops processed data that along with the iovw that points at it */
445
446 iovw_free_contents(&imp->iovw);
447
448 /* possibly reset buffer position */
449 remain = imp->filled - imp->offset;
450
451 if (remain == 0) /* no brainer */
452 imp->offset = imp->scanned = imp->filled = 0;
453 else if (imp->offset > imp->size - imp->filled &&
454 imp->offset > remain) {
455 memcpy(imp->buf, imp->buf + imp->offset, remain);
456 imp->offset = imp->scanned = 0;
457 imp->filled = remain;
458 }
459
460 target = imp->size;
461 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
462 target /= 2;
463 if (target < imp->size) {
464 char *tmp;
465
466 tmp = realloc(imp->buf, target);
467 if (!tmp)
468 log_warning("Failed to reallocate buffer to (smaller) size %zu",
469 target);
470 else {
471 log_debug("Reallocated buffer from %zu to %zu bytes",
472 imp->size, target);
473 imp->buf = tmp;
474 imp->size = target;
475 }
476 }
477}
478
479bool journal_importer_eof(const JournalImporter *imp) {
480 return imp->state == IMPORTER_STATE_EOF;
481}