]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-remote-parse.c
po: Initial Hungarian translation
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
fdfccdbc
ZJS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include "journal-remote-parse.h"
23#include "journald-native.h"
24
4a0a6ac0 25#define LINE_CHUNK 8*1024u
fdfccdbc
ZJS
26
27void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
9ff48d09 31 if (source->fd >= 0 && !source->passive_fd) {
fdfccdbc 32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
8201af08 33 safe_close(source->fd);
fdfccdbc 34 }
9ff48d09 35
fdfccdbc
ZJS
36 free(source->name);
37 free(source->buf);
38 iovw_free_contents(&source->iovw);
8201af08 39
9ff48d09
ZJS
40 log_debug("Writer ref count %u", source->writer->n_ref);
41 writer_unref(source->writer);
42
8201af08
ZJS
43 sd_event_source_unref(source->event);
44
fdfccdbc
ZJS
45 free(source);
46}
47
9ff48d09
ZJS
48/**
49 * Initialize zero-filled source with given values. On success, takes
50 * ownerhship of fd and writer, otherwise does not touch them.
51 */
52RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
53
54 RemoteSource *source;
55
56 log_debug("Creating source for %sfd:%d (%s)",
57 passive_fd ? "passive " : "", fd, name);
58
59 assert(fd >= 0);
60
61 source = new0(RemoteSource, 1);
62 if (!source)
63 return NULL;
64
65 source->fd = fd;
66 source->passive_fd = passive_fd;
67 source->name = name;
68 source->writer = writer;
69
70 return source;
71}
72
92b10cbc
ZJS
73static char* realloc_buffer(RemoteSource *source, size_t size) {
74 char *b, *old = source->buf;
75
76 b = GREEDY_REALLOC(source->buf, source->size, size);
77 if (!b)
78 return NULL;
79
80 iovw_rebase(&source->iovw, old, source->buf);
81
82 return b;
83}
84
fdfccdbc 85static int get_line(RemoteSource *source, char **line, size_t *size) {
92b10cbc 86 ssize_t n;
60921179 87 char *c = NULL;
fdfccdbc
ZJS
88
89 assert(source);
90 assert(source->state == STATE_LINE);
92b10cbc 91 assert(source->offset <= source->filled);
fdfccdbc
ZJS
92 assert(source->filled <= source->size);
93 assert(source->buf == NULL || source->size > 0);
9ff48d09 94 assert(source->fd >= 0);
fdfccdbc 95
851d4e2a 96 while (true) {
92b10cbc
ZJS
97 if (source->buf) {
98 size_t start = MAX(source->scanned, source->offset);
99
100 c = memchr(source->buf + start, '\n',
101 source->filled - start);
102 if (c != NULL)
103 break;
104 }
851d4e2a
ZJS
105
106 source->scanned = source->filled;
107 if (source->scanned >= DATA_SIZE_MAX) {
108 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
109 return -E2BIG;
110 }
60921179 111
9ff48d09 112 if (source->passive_fd)
851d4e2a
ZJS
113 /* we have to wait for some data to come to us */
114 return -EWOULDBLOCK;
fdfccdbc 115
851d4e2a 116 if (source->size - source->filled < LINE_CHUNK &&
92b10cbc
ZJS
117 !realloc_buffer(source,
118 MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
851d4e2a 119 return log_oom();
9786767a 120
4a0a6ac0 121 assert(source->size - source->filled >= LINE_CHUNK ||
92b10cbc 122 source->size == ENTRY_SIZE_MAX);
fdfccdbc 123
851d4e2a 124 n = read(source->fd, source->buf + source->filled,
4a0a6ac0 125 source->size - source->filled);
851d4e2a
ZJS
126 if (n < 0) {
127 if (errno != EAGAIN && errno != EWOULDBLOCK)
56f64d95 128 log_error_errno(errno, "read(%d, ..., %zd): %m", source->fd,
851d4e2a
ZJS
129 source->size - source->filled);
130 return -errno;
131 } else if (n == 0)
132 return 0;
fdfccdbc 133
851d4e2a
ZJS
134 source->filled += n;
135 }
fdfccdbc 136
92b10cbc
ZJS
137 *line = source->buf + source->offset;
138 *size = c + 1 - source->buf - source->offset;
139 source->offset += *size;
fdfccdbc
ZJS
140
141 return 1;
142}
143
cc64d017
ZJS
144int push_data(RemoteSource *source, const char *data, size_t size) {
145 assert(source);
146 assert(source->state != STATE_EOF);
147
92b10cbc 148 if (!realloc_buffer(source, source->filled + size)) {
851d4e2a
ZJS
149 log_error("Failed to store received data of size %zu "
150 "(in addition to existing %zu bytes with %zu filled): %s",
151 size, source->size, source->filled, strerror(ENOMEM));
152 return -ENOMEM;
153 }
cc64d017
ZJS
154
155 memcpy(source->buf + source->filled, data, size);
156 source->filled += size;
157
158 return 0;
159}
160
fdfccdbc 161static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
fdfccdbc
ZJS
162
163 assert(source);
164 assert(source->state == STATE_DATA_START ||
165 source->state == STATE_DATA ||
166 source->state == STATE_DATA_FINISH);
167 assert(size <= DATA_SIZE_MAX);
92b10cbc 168 assert(source->offset <= source->filled);
fdfccdbc
ZJS
169 assert(source->filled <= source->size);
170 assert(source->buf != NULL || source->size == 0);
171 assert(source->buf == NULL || source->size > 0);
9ff48d09 172 assert(source->fd >= 0);
fdfccdbc
ZJS
173 assert(data);
174
92b10cbc
ZJS
175 while (source->filled - source->offset < size) {
176 int n;
177
9ff48d09 178 if (source->passive_fd)
9786767a
ZJS
179 /* we have to wait for some data to come to us */
180 return -EWOULDBLOCK;
181
92b10cbc 182 if (!realloc_buffer(source, source->offset + size))
fdfccdbc
ZJS
183 return log_oom();
184
185 n = read(source->fd, source->buf + source->filled,
186 source->size - source->filled);
187 if (n < 0) {
188 if (errno != EAGAIN && errno != EWOULDBLOCK)
56f64d95 189 log_error_errno(errno, "read(%d, ..., %zd): %m", source->fd,
fdfccdbc
ZJS
190 source->size - source->filled);
191 return -errno;
192 } else if (n == 0)
193 return 0;
194
195 source->filled += n;
196 }
197
92b10cbc
ZJS
198 *data = source->buf + source->offset;
199 source->offset += size;
fdfccdbc
ZJS
200
201 return 1;
202}
203
204static int get_data_size(RemoteSource *source) {
205 int r;
92b10cbc 206 void *data;
fdfccdbc
ZJS
207
208 assert(source);
209 assert(source->state == STATE_DATA_START);
210 assert(source->data_size == 0);
211
212 r = fill_fixed_size(source, &data, sizeof(uint64_t));
213 if (r <= 0)
214 return r;
215
216 source->data_size = le64toh( *(uint64_t *) data );
217 if (source->data_size > DATA_SIZE_MAX) {
a83f4037 218 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
fdfccdbc
ZJS
219 source->data_size, DATA_SIZE_MAX);
220 return -EINVAL;
221 }
222 if (source->data_size == 0)
223 log_warning("Binary field with zero length");
224
225 return 1;
226}
227
228static int get_data_data(RemoteSource *source, void **data) {
229 int r;
230
231 assert(source);
232 assert(data);
233 assert(source->state == STATE_DATA);
234
235 r = fill_fixed_size(source, data, source->data_size);
236 if (r <= 0)
237 return r;
238
239 return 1;
240}
241
242static int get_data_newline(RemoteSource *source) {
243 int r;
92b10cbc 244 char *data;
fdfccdbc
ZJS
245
246 assert(source);
247 assert(source->state == STATE_DATA_FINISH);
248
249 r = fill_fixed_size(source, (void**) &data, 1);
250 if (r <= 0)
251 return r;
252
253 assert(data);
254 if (*data != '\n') {
255 log_error("expected newline, got '%c'", *data);
256 return -EINVAL;
257 }
258
259 return 1;
260}
261
262static int process_dunder(RemoteSource *source, char *line, size_t n) {
263 const char *timestamp;
264 int r;
265
266 assert(line);
267 assert(n > 0);
268 assert(line[n-1] == '\n');
269
270 /* XXX: is it worth to support timestamps in extended format?
271 * We don't produce them, but who knows... */
272
273 timestamp = startswith(line, "__CURSOR=");
274 if (timestamp)
275 /* ignore __CURSOR */
276 return 1;
277
278 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
279 if (timestamp) {
280 long long unsigned x;
281 line[n-1] = '\0';
282 r = safe_atollu(timestamp, &x);
283 if (r < 0)
284 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
285 else
286 source->ts.realtime = x;
287 return r < 0 ? r : 1;
288 }
289
290 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
291 if (timestamp) {
292 long long unsigned x;
293 line[n-1] = '\0';
294 r = safe_atollu(timestamp, &x);
295 if (r < 0)
296 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
297 else
298 source->ts.monotonic = x;
299 return r < 0 ? r : 1;
300 }
301
302 timestamp = startswith(line, "__");
303 if (timestamp) {
304 log_notice("Unknown dunder line %s", line);
305 return 1;
306 }
307
308 /* no dunder */
309 return 0;
310}
311
312int process_data(RemoteSource *source) {
313 int r;
314
315 switch(source->state) {
316 case STATE_LINE: {
317 char *line, *sep;
318 size_t n;
319
320 assert(source->data_size == 0);
321
322 r = get_line(source, &line, &n);
323 if (r < 0)
324 return r;
325 if (r == 0) {
326 source->state = STATE_EOF;
327 return r;
328 }
329 assert(n > 0);
330 assert(line[n-1] == '\n');
331
332 if (n == 1) {
cb41ff29 333 log_trace("Received empty line, event is ready");
fdfccdbc
ZJS
334 return 1;
335 }
336
337 r = process_dunder(source, line, n);
92b10cbc 338 if (r != 0)
fdfccdbc 339 return r < 0 ? r : 0;
fdfccdbc
ZJS
340
341 /* MESSAGE=xxx\n
342 or
343 COREDUMP\n
344 LLLLLLLL0011223344...\n
345 */
346 sep = memchr(line, '=', n);
347 if (sep)
348 /* chomp newline */
349 n--;
350 else
351 /* replace \n with = */
352 line[n-1] = '=';
cb41ff29 353 log_trace("Received: %.*s", (int) n, line);
fdfccdbc
ZJS
354
355 r = iovw_put(&source->iovw, line, n);
356 if (r < 0) {
357 log_error("Failed to put line in iovect");
fdfccdbc
ZJS
358 return r;
359 }
360
361 if (!sep)
362 source->state = STATE_DATA_START;
363 return 0; /* continue */
364 }
365
366 case STATE_DATA_START:
367 assert(source->data_size == 0);
368
369 r = get_data_size(source);
dd87b184 370 // log_debug("get_data_size() -> %d", r);
fdfccdbc
ZJS
371 if (r < 0)
372 return r;
373 if (r == 0) {
374 source->state = STATE_EOF;
375 return 0;
376 }
377
378 source->state = source->data_size > 0 ?
379 STATE_DATA : STATE_DATA_FINISH;
380
381 return 0; /* continue */
382
383 case STATE_DATA: {
384 void *data;
385
386 assert(source->data_size > 0);
387
388 r = get_data_data(source, &data);
dd87b184 389 // log_debug("get_data_data() -> %d", r);
fdfccdbc
ZJS
390 if (r < 0)
391 return r;
392 if (r == 0) {
393 source->state = STATE_EOF;
394 return 0;
395 }
396
397 assert(data);
398
399 r = iovw_put(&source->iovw, data, source->data_size);
400 if (r < 0) {
401 log_error("failed to put binary buffer in iovect");
402 return r;
403 }
404
405 source->state = STATE_DATA_FINISH;
406
407 return 0; /* continue */
408 }
409
410 case STATE_DATA_FINISH:
411 r = get_data_newline(source);
dd87b184 412 // log_debug("get_data_newline() -> %d", r);
fdfccdbc
ZJS
413 if (r < 0)
414 return r;
415 if (r == 0) {
416 source->state = STATE_EOF;
417 return 0;
418 }
419
420 source->data_size = 0;
421 source->state = STATE_LINE;
422
423 return 0; /* continue */
424 default:
425 assert_not_reached("wtf?");
426 }
427}
428
9ff48d09 429int process_source(RemoteSource *source, bool compress, bool seal) {
92b10cbc 430 size_t remain, target;
fdfccdbc
ZJS
431 int r;
432
433 assert(source);
9ff48d09 434 assert(source->writer);
fdfccdbc
ZJS
435
436 r = process_data(source);
437 if (r <= 0)
438 return r;
439
440 /* We have a full event */
cb41ff29 441 log_trace("Received a full event from source@%p fd:%d (%s)",
a83f4037 442 source, source->fd, source->name);
fdfccdbc
ZJS
443
444 if (!source->iovw.count) {
445 log_warning("Entry with no payload, skipping");
446 goto freeing;
447 }
448
449 assert(source->iovw.iovec);
450 assert(source->iovw.count);
451
9ff48d09 452 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
fdfccdbc 453 if (r < 0)
c33b3297
MS
454 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
455 iovw_size(&source->iovw));
fdfccdbc
ZJS
456 else
457 r = 1;
458
459 freeing:
460 iovw_free_contents(&source->iovw);
92b10cbc
ZJS
461
462 /* possibly reset buffer position */
463 remain = source->filled - source->offset;
464
465 if (remain == 0) /* no brainer */
466 source->offset = source->scanned = source->filled = 0;
467 else if (source->offset > source->size - source->filled &&
468 source->offset > remain) {
469 memcpy(source->buf, source->buf + source->offset, remain);
470 source->offset = source->scanned = 0;
471 source->filled = remain;
472 }
473
474 target = source->size;
475 while (target > 16 * LINE_CHUNK && remain < target / 2)
476 target /= 2;
477 if (target < source->size) {
478 char *tmp;
479
480 tmp = realloc(source->buf, target);
e4c38cc3 481 if (!tmp)
92b10cbc
ZJS
482 log_warning("Failed to reallocate buffer to (smaller) size %zu",
483 target);
484 else {
485 log_debug("Reallocated buffer from %zu to %zu bytes",
486 source->size, target);
487 source->buf = tmp;
488 source->size = target;
489 }
490 }
491
fdfccdbc
ZJS
492 return r;
493}