]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-remote-parse.c
tree-wide: update empty-if coccinelle script to cover empty-while and more
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
fdfccdbc
ZJS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include "journal-remote-parse.h"
23#include "journald-native.h"
24
4a0a6ac0 25#define LINE_CHUNK 8*1024u
fdfccdbc
ZJS
26
27void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
9ff48d09 31 if (source->fd >= 0 && !source->passive_fd) {
fdfccdbc 32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
8201af08 33 safe_close(source->fd);
fdfccdbc 34 }
9ff48d09 35
fdfccdbc
ZJS
36 free(source->name);
37 free(source->buf);
38 iovw_free_contents(&source->iovw);
8201af08 39
1fa2f38f 40 log_debug("Writer ref count %i", source->writer->n_ref);
9ff48d09
ZJS
41 writer_unref(source->writer);
42
8201af08 43 sd_event_source_unref(source->event);
043945b9 44 sd_event_source_unref(source->buffer_event);
8201af08 45
fdfccdbc
ZJS
46 free(source);
47}
48
9ff48d09
ZJS
49/**
50 * Initialize zero-filled source with given values. On success, takes
51 * ownerhship of fd and writer, otherwise does not touch them.
52 */
53RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
54
55 RemoteSource *source;
56
57 log_debug("Creating source for %sfd:%d (%s)",
58 passive_fd ? "passive " : "", fd, name);
59
60 assert(fd >= 0);
61
62 source = new0(RemoteSource, 1);
63 if (!source)
64 return NULL;
65
66 source->fd = fd;
67 source->passive_fd = passive_fd;
68 source->name = name;
69 source->writer = writer;
70
71 return source;
72}
73
92b10cbc
ZJS
74static char* realloc_buffer(RemoteSource *source, size_t size) {
75 char *b, *old = source->buf;
76
77 b = GREEDY_REALLOC(source->buf, source->size, size);
78 if (!b)
79 return NULL;
80
81 iovw_rebase(&source->iovw, old, source->buf);
82
83 return b;
84}
85
fdfccdbc 86static int get_line(RemoteSource *source, char **line, size_t *size) {
92b10cbc 87 ssize_t n;
60921179 88 char *c = NULL;
fdfccdbc
ZJS
89
90 assert(source);
91 assert(source->state == STATE_LINE);
92b10cbc 92 assert(source->offset <= source->filled);
fdfccdbc
ZJS
93 assert(source->filled <= source->size);
94 assert(source->buf == NULL || source->size > 0);
9ff48d09 95 assert(source->fd >= 0);
fdfccdbc 96
851d4e2a 97 while (true) {
92b10cbc
ZJS
98 if (source->buf) {
99 size_t start = MAX(source->scanned, source->offset);
100
101 c = memchr(source->buf + start, '\n',
102 source->filled - start);
103 if (c != NULL)
104 break;
105 }
851d4e2a
ZJS
106
107 source->scanned = source->filled;
108 if (source->scanned >= DATA_SIZE_MAX) {
109 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
110 return -E2BIG;
111 }
60921179 112
9ff48d09 113 if (source->passive_fd)
851d4e2a 114 /* we have to wait for some data to come to us */
ff55c3c7 115 return -EAGAIN;
fdfccdbc 116
39d0fd9c
ZJS
117 /* We know that source->filled is at most DATA_SIZE_MAX, so if
118 we reallocate it, we'll increase the size at least a bit. */
119 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
851d4e2a 120 if (source->size - source->filled < LINE_CHUNK &&
39d0fd9c 121 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
851d4e2a 122 return log_oom();
9786767a 123
39d0fd9c 124 assert(source->buf);
4a0a6ac0 125 assert(source->size - source->filled >= LINE_CHUNK ||
92b10cbc 126 source->size == ENTRY_SIZE_MAX);
fdfccdbc 127
39d0fd9c
ZJS
128 n = read(source->fd,
129 source->buf + source->filled,
4a0a6ac0 130 source->size - source->filled);
851d4e2a 131 if (n < 0) {
ff55c3c7 132 if (errno != EAGAIN)
39d0fd9c
ZJS
133 log_error_errno(errno, "read(%d, ..., %zu): %m",
134 source->fd,
1fa2f38f 135 source->size - source->filled);
851d4e2a
ZJS
136 return -errno;
137 } else if (n == 0)
138 return 0;
fdfccdbc 139
851d4e2a
ZJS
140 source->filled += n;
141 }
fdfccdbc 142
92b10cbc
ZJS
143 *line = source->buf + source->offset;
144 *size = c + 1 - source->buf - source->offset;
145 source->offset += *size;
fdfccdbc
ZJS
146
147 return 1;
148}
149
cc64d017
ZJS
150int push_data(RemoteSource *source, const char *data, size_t size) {
151 assert(source);
152 assert(source->state != STATE_EOF);
153
92b10cbc 154 if (!realloc_buffer(source, source->filled + size)) {
851d4e2a
ZJS
155 log_error("Failed to store received data of size %zu "
156 "(in addition to existing %zu bytes with %zu filled): %s",
157 size, source->size, source->filled, strerror(ENOMEM));
158 return -ENOMEM;
159 }
cc64d017
ZJS
160
161 memcpy(source->buf + source->filled, data, size);
162 source->filled += size;
163
164 return 0;
165}
166
fdfccdbc 167static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
fdfccdbc
ZJS
168
169 assert(source);
170 assert(source->state == STATE_DATA_START ||
171 source->state == STATE_DATA ||
172 source->state == STATE_DATA_FINISH);
173 assert(size <= DATA_SIZE_MAX);
92b10cbc 174 assert(source->offset <= source->filled);
fdfccdbc
ZJS
175 assert(source->filled <= source->size);
176 assert(source->buf != NULL || source->size == 0);
177 assert(source->buf == NULL || source->size > 0);
9ff48d09 178 assert(source->fd >= 0);
fdfccdbc
ZJS
179 assert(data);
180
92b10cbc
ZJS
181 while (source->filled - source->offset < size) {
182 int n;
183
9ff48d09 184 if (source->passive_fd)
9786767a 185 /* we have to wait for some data to come to us */
ff55c3c7 186 return -EAGAIN;
9786767a 187
92b10cbc 188 if (!realloc_buffer(source, source->offset + size))
fdfccdbc
ZJS
189 return log_oom();
190
191 n = read(source->fd, source->buf + source->filled,
192 source->size - source->filled);
193 if (n < 0) {
ff55c3c7 194 if (errno != EAGAIN)
1fa2f38f
ZJS
195 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
196 source->size - source->filled);
fdfccdbc
ZJS
197 return -errno;
198 } else if (n == 0)
199 return 0;
200
201 source->filled += n;
202 }
203
92b10cbc
ZJS
204 *data = source->buf + source->offset;
205 source->offset += size;
fdfccdbc
ZJS
206
207 return 1;
208}
209
210static int get_data_size(RemoteSource *source) {
211 int r;
92b10cbc 212 void *data;
fdfccdbc
ZJS
213
214 assert(source);
215 assert(source->state == STATE_DATA_START);
216 assert(source->data_size == 0);
217
218 r = fill_fixed_size(source, &data, sizeof(uint64_t));
219 if (r <= 0)
220 return r;
221
222 source->data_size = le64toh( *(uint64_t *) data );
223 if (source->data_size > DATA_SIZE_MAX) {
a83f4037 224 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
fdfccdbc
ZJS
225 source->data_size, DATA_SIZE_MAX);
226 return -EINVAL;
227 }
228 if (source->data_size == 0)
229 log_warning("Binary field with zero length");
230
231 return 1;
232}
233
234static int get_data_data(RemoteSource *source, void **data) {
235 int r;
236
237 assert(source);
238 assert(data);
239 assert(source->state == STATE_DATA);
240
241 r = fill_fixed_size(source, data, source->data_size);
242 if (r <= 0)
243 return r;
244
245 return 1;
246}
247
248static int get_data_newline(RemoteSource *source) {
249 int r;
92b10cbc 250 char *data;
fdfccdbc
ZJS
251
252 assert(source);
253 assert(source->state == STATE_DATA_FINISH);
254
255 r = fill_fixed_size(source, (void**) &data, 1);
256 if (r <= 0)
257 return r;
258
259 assert(data);
260 if (*data != '\n') {
261 log_error("expected newline, got '%c'", *data);
262 return -EINVAL;
263 }
264
265 return 1;
266}
267
268static int process_dunder(RemoteSource *source, char *line, size_t n) {
269 const char *timestamp;
270 int r;
271
272 assert(line);
273 assert(n > 0);
274 assert(line[n-1] == '\n');
275
276 /* XXX: is it worth to support timestamps in extended format?
277 * We don't produce them, but who knows... */
278
279 timestamp = startswith(line, "__CURSOR=");
280 if (timestamp)
281 /* ignore __CURSOR */
282 return 1;
283
284 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
285 if (timestamp) {
286 long long unsigned x;
287 line[n-1] = '\0';
288 r = safe_atollu(timestamp, &x);
289 if (r < 0)
290 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
291 else
292 source->ts.realtime = x;
293 return r < 0 ? r : 1;
294 }
295
296 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
297 if (timestamp) {
298 long long unsigned x;
299 line[n-1] = '\0';
300 r = safe_atollu(timestamp, &x);
301 if (r < 0)
302 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
303 else
304 source->ts.monotonic = x;
305 return r < 0 ? r : 1;
306 }
307
308 timestamp = startswith(line, "__");
309 if (timestamp) {
310 log_notice("Unknown dunder line %s", line);
311 return 1;
312 }
313
314 /* no dunder */
315 return 0;
316}
317
be7e1319 318static int process_data(RemoteSource *source) {
fdfccdbc
ZJS
319 int r;
320
321 switch(source->state) {
322 case STATE_LINE: {
323 char *line, *sep;
a7f7d1bd 324 size_t n = 0;
fdfccdbc
ZJS
325
326 assert(source->data_size == 0);
327
328 r = get_line(source, &line, &n);
329 if (r < 0)
330 return r;
331 if (r == 0) {
332 source->state = STATE_EOF;
333 return r;
334 }
335 assert(n > 0);
336 assert(line[n-1] == '\n');
337
338 if (n == 1) {
cb41ff29 339 log_trace("Received empty line, event is ready");
fdfccdbc
ZJS
340 return 1;
341 }
342
343 r = process_dunder(source, line, n);
92b10cbc 344 if (r != 0)
fdfccdbc 345 return r < 0 ? r : 0;
fdfccdbc
ZJS
346
347 /* MESSAGE=xxx\n
348 or
349 COREDUMP\n
350 LLLLLLLL0011223344...\n
351 */
352 sep = memchr(line, '=', n);
09d801a8 353 if (sep) {
fdfccdbc
ZJS
354 /* chomp newline */
355 n--;
09d801a8
ZJS
356
357 r = iovw_put(&source->iovw, line, n);
358 if (r < 0)
359 return r;
360 } else {
fdfccdbc
ZJS
361 /* replace \n with = */
362 line[n-1] = '=';
fdfccdbc 363
09d801a8
ZJS
364 source->field_len = n;
365 source->state = STATE_DATA_START;
366
367 /* we cannot put the field in iovec until we have all data */
fdfccdbc
ZJS
368 }
369
09d801a8
ZJS
370 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
371
fdfccdbc
ZJS
372 return 0; /* continue */
373 }
374
375 case STATE_DATA_START:
376 assert(source->data_size == 0);
377
378 r = get_data_size(source);
dd87b184 379 // log_debug("get_data_size() -> %d", r);
fdfccdbc
ZJS
380 if (r < 0)
381 return r;
382 if (r == 0) {
383 source->state = STATE_EOF;
384 return 0;
385 }
386
387 source->state = source->data_size > 0 ?
388 STATE_DATA : STATE_DATA_FINISH;
389
390 return 0; /* continue */
391
392 case STATE_DATA: {
393 void *data;
09d801a8 394 char *field;
fdfccdbc
ZJS
395
396 assert(source->data_size > 0);
397
398 r = get_data_data(source, &data);
dd87b184 399 // log_debug("get_data_data() -> %d", r);
fdfccdbc
ZJS
400 if (r < 0)
401 return r;
402 if (r == 0) {
403 source->state = STATE_EOF;
404 return 0;
405 }
406
407 assert(data);
408
09d801a8
ZJS
409 field = (char*) data - sizeof(uint64_t) - source->field_len;
410 memmove(field + sizeof(uint64_t), field, source->field_len);
411
412 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
413 if (r < 0)
fdfccdbc 414 return r;
fdfccdbc
ZJS
415
416 source->state = STATE_DATA_FINISH;
417
418 return 0; /* continue */
419 }
420
421 case STATE_DATA_FINISH:
422 r = get_data_newline(source);
dd87b184 423 // log_debug("get_data_newline() -> %d", r);
fdfccdbc
ZJS
424 if (r < 0)
425 return r;
426 if (r == 0) {
427 source->state = STATE_EOF;
428 return 0;
429 }
430
431 source->data_size = 0;
432 source->state = STATE_LINE;
433
434 return 0; /* continue */
435 default:
436 assert_not_reached("wtf?");
437 }
438}
439
9ff48d09 440int process_source(RemoteSource *source, bool compress, bool seal) {
92b10cbc 441 size_t remain, target;
fdfccdbc
ZJS
442 int r;
443
444 assert(source);
9ff48d09 445 assert(source->writer);
fdfccdbc
ZJS
446
447 r = process_data(source);
448 if (r <= 0)
449 return r;
450
451 /* We have a full event */
0e72da6f 452 log_trace("Received full event from source@%p fd:%d (%s)",
a83f4037 453 source, source->fd, source->name);
fdfccdbc
ZJS
454
455 if (!source->iovw.count) {
456 log_warning("Entry with no payload, skipping");
457 goto freeing;
458 }
459
460 assert(source->iovw.iovec);
461 assert(source->iovw.count);
462
9ff48d09 463 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
fdfccdbc 464 if (r < 0)
c33b3297
MS
465 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
466 iovw_size(&source->iovw));
fdfccdbc
ZJS
467 else
468 r = 1;
469
470 freeing:
471 iovw_free_contents(&source->iovw);
92b10cbc
ZJS
472
473 /* possibly reset buffer position */
474 remain = source->filled - source->offset;
475
476 if (remain == 0) /* no brainer */
477 source->offset = source->scanned = source->filled = 0;
478 else if (source->offset > source->size - source->filled &&
479 source->offset > remain) {
480 memcpy(source->buf, source->buf + source->offset, remain);
481 source->offset = source->scanned = 0;
482 source->filled = remain;
483 }
484
485 target = source->size;
486 while (target > 16 * LINE_CHUNK && remain < target / 2)
487 target /= 2;
488 if (target < source->size) {
489 char *tmp;
490
491 tmp = realloc(source->buf, target);
e4c38cc3 492 if (!tmp)
92b10cbc
ZJS
493 log_warning("Failed to reallocate buffer to (smaller) size %zu",
494 target);
495 else {
496 log_debug("Reallocated buffer from %zu to %zu bytes",
497 source->size, target);
498 source->buf = tmp;
499 source->size = target;
500 }
501 }
502
fdfccdbc
ZJS
503 return r;
504}