]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/basic/journal-importer.c
job: change result field for log message about job result RESULT= → JOB_RESULT=
[thirdparty/systemd.git] / src / basic / journal-importer.c
CommitLineData
b18453ed
ZJS
1/***
2 This file is part of systemd.
3
4 Copyright 2014 Zbigniew Jędrzejewski-Szmek
5
6 systemd is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 systemd is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with systemd; If not, see <http://www.gnu.org/licenses/>.
18***/
19
20#include <unistd.h>
21
22#include "alloc-util.h"
23#include "journal-importer.h"
24#include "fd-util.h"
25#include "parse-util.h"
26#include "string-util.h"
f652c62d 27#include "unaligned.h"
b18453ed
ZJS
28
29enum {
30 IMPORTER_STATE_LINE = 0, /* waiting to read, or reading line */
31 IMPORTER_STATE_DATA_START, /* reading binary data header */
32 IMPORTER_STATE_DATA, /* reading binary data */
33 IMPORTER_STATE_DATA_FINISH, /* expecting newline */
34 IMPORTER_STATE_EOF, /* done */
35};
36
37static int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
38 if (!GREEDY_REALLOC(iovw->iovec, iovw->size_bytes, iovw->count + 1))
39 return log_oom();
40
41 iovw->iovec[iovw->count++] = (struct iovec) {data, len};
42 return 0;
43}
44
45static void iovw_free_contents(struct iovec_wrapper *iovw) {
46 iovw->iovec = mfree(iovw->iovec);
47 iovw->size_bytes = iovw->count = 0;
48}
49
50static void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
51 size_t i;
52
53 for (i = 0; i < iovw->count; i++)
54 iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
55}
56
57size_t iovw_size(struct iovec_wrapper *iovw) {
58 size_t n = 0, i;
59
60 for (i = 0; i < iovw->count; i++)
61 n += iovw->iovec[i].iov_len;
62
63 return n;
64}
65
66void journal_importer_cleanup(JournalImporter *imp) {
67 if (imp->fd >= 0 && !imp->passive_fd) {
68 log_debug("Closing %s (fd=%d)", imp->name ?: "importer", imp->fd);
69 safe_close(imp->fd);
70 }
71
2ddb70d2 72 free(imp->name);
b18453ed
ZJS
73 free(imp->buf);
74 iovw_free_contents(&imp->iovw);
75}
76
77static char* realloc_buffer(JournalImporter *imp, size_t size) {
78 char *b, *old = imp->buf;
79
80 b = GREEDY_REALLOC(imp->buf, imp->size, size);
81 if (!b)
82 return NULL;
83
84 iovw_rebase(&imp->iovw, old, imp->buf);
85
86 return b;
87}
88
89static int get_line(JournalImporter *imp, char **line, size_t *size) {
90 ssize_t n;
91 char *c = NULL;
92
93 assert(imp);
94 assert(imp->state == IMPORTER_STATE_LINE);
95 assert(imp->offset <= imp->filled);
96 assert(imp->filled <= imp->size);
97 assert(imp->buf == NULL || imp->size > 0);
98 assert(imp->fd >= 0);
99
100 for (;;) {
101 if (imp->buf) {
102 size_t start = MAX(imp->scanned, imp->offset);
103
104 c = memchr(imp->buf + start, '\n',
105 imp->filled - start);
106 if (c != NULL)
107 break;
108 }
109
110 imp->scanned = imp->filled;
111 if (imp->scanned >= DATA_SIZE_MAX) {
112 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
113 return -E2BIG;
114 }
115
116 if (imp->passive_fd)
117 /* we have to wait for some data to come to us */
118 return -EAGAIN;
119
120 /* We know that imp->filled is at most DATA_SIZE_MAX, so if
121 we reallocate it, we'll increase the size at least a bit. */
122 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
123 if (imp->size - imp->filled < LINE_CHUNK &&
124 !realloc_buffer(imp, MIN(imp->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
125 return log_oom();
126
127 assert(imp->buf);
128 assert(imp->size - imp->filled >= LINE_CHUNK ||
129 imp->size == ENTRY_SIZE_MAX);
130
131 n = read(imp->fd,
132 imp->buf + imp->filled,
133 imp->size - imp->filled);
134 if (n < 0) {
135 if (errno != EAGAIN)
136 log_error_errno(errno, "read(%d, ..., %zu): %m",
137 imp->fd,
138 imp->size - imp->filled);
139 return -errno;
140 } else if (n == 0)
141 return 0;
142
143 imp->filled += n;
144 }
145
146 *line = imp->buf + imp->offset;
147 *size = c + 1 - imp->buf - imp->offset;
148 imp->offset += *size;
149
150 return 1;
151}
152
153static int fill_fixed_size(JournalImporter *imp, void **data, size_t size) {
154
155 assert(imp);
156 assert(imp->state == IMPORTER_STATE_DATA_START ||
157 imp->state == IMPORTER_STATE_DATA ||
158 imp->state == IMPORTER_STATE_DATA_FINISH);
159 assert(size <= DATA_SIZE_MAX);
160 assert(imp->offset <= imp->filled);
161 assert(imp->filled <= imp->size);
162 assert(imp->buf != NULL || imp->size == 0);
163 assert(imp->buf == NULL || imp->size > 0);
164 assert(imp->fd >= 0);
165 assert(data);
166
167 while (imp->filled - imp->offset < size) {
168 int n;
169
170 if (imp->passive_fd)
171 /* we have to wait for some data to come to us */
172 return -EAGAIN;
173
174 if (!realloc_buffer(imp, imp->offset + size))
175 return log_oom();
176
177 n = read(imp->fd, imp->buf + imp->filled,
178 imp->size - imp->filled);
179 if (n < 0) {
180 if (errno != EAGAIN)
181 log_error_errno(errno, "read(%d, ..., %zu): %m", imp->fd,
182 imp->size - imp->filled);
183 return -errno;
184 } else if (n == 0)
185 return 0;
186
187 imp->filled += n;
188 }
189
190 *data = imp->buf + imp->offset;
191 imp->offset += size;
192
193 return 1;
194}
195
196static int get_data_size(JournalImporter *imp) {
197 int r;
198 void *data;
199
200 assert(imp);
201 assert(imp->state == IMPORTER_STATE_DATA_START);
202 assert(imp->data_size == 0);
203
204 r = fill_fixed_size(imp, &data, sizeof(uint64_t));
205 if (r <= 0)
206 return r;
207
f652c62d 208 imp->data_size = unaligned_read_le64(data);
b18453ed
ZJS
209 if (imp->data_size > DATA_SIZE_MAX) {
210 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
211 imp->data_size, DATA_SIZE_MAX);
212 return -EINVAL;
213 }
214 if (imp->data_size == 0)
215 log_warning("Binary field with zero length");
216
217 return 1;
218}
219
220static int get_data_data(JournalImporter *imp, void **data) {
221 int r;
222
223 assert(imp);
224 assert(data);
225 assert(imp->state == IMPORTER_STATE_DATA);
226
227 r = fill_fixed_size(imp, data, imp->data_size);
228 if (r <= 0)
229 return r;
230
231 return 1;
232}
233
234static int get_data_newline(JournalImporter *imp) {
235 int r;
236 char *data;
237
238 assert(imp);
239 assert(imp->state == IMPORTER_STATE_DATA_FINISH);
240
241 r = fill_fixed_size(imp, (void**) &data, 1);
242 if (r <= 0)
243 return r;
244
245 assert(data);
246 if (*data != '\n') {
247 log_error("expected newline, got '%c'", *data);
248 return -EINVAL;
249 }
250
251 return 1;
252}
253
254static int process_dunder(JournalImporter *imp, char *line, size_t n) {
255 const char *timestamp;
256 int r;
257
258 assert(line);
259 assert(n > 0);
260 assert(line[n-1] == '\n');
261
262 /* XXX: is it worth to support timestamps in extended format?
263 * We don't produce them, but who knows... */
264
265 timestamp = startswith(line, "__CURSOR=");
266 if (timestamp)
267 /* ignore __CURSOR */
268 return 1;
269
270 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
271 if (timestamp) {
272 long long unsigned x;
273 line[n-1] = '\0';
274 r = safe_atollu(timestamp, &x);
275 if (r < 0)
276 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
277 else
278 imp->ts.realtime = x;
279 return r < 0 ? r : 1;
280 }
281
282 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
283 if (timestamp) {
284 long long unsigned x;
285 line[n-1] = '\0';
286 r = safe_atollu(timestamp, &x);
287 if (r < 0)
288 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
289 else
290 imp->ts.monotonic = x;
291 return r < 0 ? r : 1;
292 }
293
294 timestamp = startswith(line, "__");
295 if (timestamp) {
296 log_notice("Unknown dunder line %s", line);
297 return 1;
298 }
299
300 /* no dunder */
301 return 0;
302}
303
304int journal_importer_process_data(JournalImporter *imp) {
305 int r;
306
307 switch(imp->state) {
308 case IMPORTER_STATE_LINE: {
309 char *line, *sep;
310 size_t n = 0;
311
312 assert(imp->data_size == 0);
313
314 r = get_line(imp, &line, &n);
315 if (r < 0)
316 return r;
317 if (r == 0) {
318 imp->state = IMPORTER_STATE_EOF;
d74dc4f2 319 return 0;
b18453ed
ZJS
320 }
321 assert(n > 0);
322 assert(line[n-1] == '\n');
323
324 if (n == 1) {
325 log_trace("Received empty line, event is ready");
326 return 1;
327 }
328
329 r = process_dunder(imp, line, n);
330 if (r != 0)
331 return r < 0 ? r : 0;
332
333 /* MESSAGE=xxx\n
334 or
335 COREDUMP\n
336 LLLLLLLL0011223344...\n
337 */
338 sep = memchr(line, '=', n);
339 if (sep) {
340 /* chomp newline */
341 n--;
342
343 r = iovw_put(&imp->iovw, line, n);
344 if (r < 0)
345 return r;
346 } else {
347 /* replace \n with = */
348 line[n-1] = '=';
349
350 imp->field_len = n;
351 imp->state = IMPORTER_STATE_DATA_START;
352
353 /* we cannot put the field in iovec until we have all data */
354 }
355
356 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
357
358 return 0; /* continue */
359 }
360
361 case IMPORTER_STATE_DATA_START:
362 assert(imp->data_size == 0);
363
364 r = get_data_size(imp);
365 // log_debug("get_data_size() -> %d", r);
366 if (r < 0)
367 return r;
368 if (r == 0) {
369 imp->state = IMPORTER_STATE_EOF;
370 return 0;
371 }
372
373 imp->state = imp->data_size > 0 ?
374 IMPORTER_STATE_DATA : IMPORTER_STATE_DATA_FINISH;
375
376 return 0; /* continue */
377
378 case IMPORTER_STATE_DATA: {
379 void *data;
380 char *field;
381
382 assert(imp->data_size > 0);
383
384 r = get_data_data(imp, &data);
385 // log_debug("get_data_data() -> %d", r);
386 if (r < 0)
387 return r;
388 if (r == 0) {
389 imp->state = IMPORTER_STATE_EOF;
390 return 0;
391 }
392
393 assert(data);
394
395 field = (char*) data - sizeof(uint64_t) - imp->field_len;
396 memmove(field + sizeof(uint64_t), field, imp->field_len);
397
398 r = iovw_put(&imp->iovw, field + sizeof(uint64_t), imp->field_len + imp->data_size);
399 if (r < 0)
400 return r;
401
402 imp->state = IMPORTER_STATE_DATA_FINISH;
403
404 return 0; /* continue */
405 }
406
407 case IMPORTER_STATE_DATA_FINISH:
408 r = get_data_newline(imp);
409 // log_debug("get_data_newline() -> %d", r);
410 if (r < 0)
411 return r;
412 if (r == 0) {
413 imp->state = IMPORTER_STATE_EOF;
414 return 0;
415 }
416
417 imp->data_size = 0;
418 imp->state = IMPORTER_STATE_LINE;
419
420 return 0; /* continue */
421 default:
422 assert_not_reached("wtf?");
423 }
424}
425
426int journal_importer_push_data(JournalImporter *imp, const char *data, size_t size) {
427 assert(imp);
428 assert(imp->state != IMPORTER_STATE_EOF);
429
430 if (!realloc_buffer(imp, imp->filled + size)) {
431 log_error("Failed to store received data of size %zu "
432 "(in addition to existing %zu bytes with %zu filled): %s",
433 size, imp->size, imp->filled, strerror(ENOMEM));
434 return -ENOMEM;
435 }
436
437 memcpy(imp->buf + imp->filled, data, size);
438 imp->filled += size;
439
440 return 0;
441}
442
443void journal_importer_drop_iovw(JournalImporter *imp) {
444 size_t remain, target;
445
446 /* This function drops processed data that along with the iovw that points at it */
447
448 iovw_free_contents(&imp->iovw);
449
450 /* possibly reset buffer position */
451 remain = imp->filled - imp->offset;
452
453 if (remain == 0) /* no brainer */
454 imp->offset = imp->scanned = imp->filled = 0;
455 else if (imp->offset > imp->size - imp->filled &&
456 imp->offset > remain) {
457 memcpy(imp->buf, imp->buf + imp->offset, remain);
458 imp->offset = imp->scanned = 0;
459 imp->filled = remain;
460 }
461
462 target = imp->size;
463 while (target > 16 * LINE_CHUNK && imp->filled < target / 2)
464 target /= 2;
465 if (target < imp->size) {
466 char *tmp;
467
468 tmp = realloc(imp->buf, target);
469 if (!tmp)
470 log_warning("Failed to reallocate buffer to (smaller) size %zu",
471 target);
472 else {
473 log_debug("Reallocated buffer from %zu to %zu bytes",
474 imp->size, target);
475 imp->buf = tmp;
476 imp->size = target;
477 }
478 }
479}
480
481bool journal_importer_eof(const JournalImporter *imp) {
482 return imp->state == IMPORTER_STATE_EOF;
483}