]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
Merge pull request #1213 from evverx/systemd-notify-log
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
31 if (source->fd >= 0 && !source->passive_fd) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
34 }
35
36 free(source->name);
37 free(source->buf);
38 iovw_free_contents(&source->iovw);
39
40 log_debug("Writer ref count %i", source->writer->n_ref);
41 writer_unref(source->writer);
42
43 sd_event_source_unref(source->event);
44 sd_event_source_unref(source->buffer_event);
45
46 free(source);
47 }
48
49 /**
50 * Initialize zero-filled source with given values. On success, takes
51 * ownerhship of fd and writer, otherwise does not touch them.
52 */
53 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
54
55 RemoteSource *source;
56
57 log_debug("Creating source for %sfd:%d (%s)",
58 passive_fd ? "passive " : "", fd, name);
59
60 assert(fd >= 0);
61
62 source = new0(RemoteSource, 1);
63 if (!source)
64 return NULL;
65
66 source->fd = fd;
67 source->passive_fd = passive_fd;
68 source->name = name;
69 source->writer = writer;
70
71 return source;
72 }
73
74 static char* realloc_buffer(RemoteSource *source, size_t size) {
75 char *b, *old = source->buf;
76
77 b = GREEDY_REALLOC(source->buf, source->size, size);
78 if (!b)
79 return NULL;
80
81 iovw_rebase(&source->iovw, old, source->buf);
82
83 return b;
84 }
85
86 static int get_line(RemoteSource *source, char **line, size_t *size) {
87 ssize_t n;
88 char *c = NULL;
89
90 assert(source);
91 assert(source->state == STATE_LINE);
92 assert(source->offset <= source->filled);
93 assert(source->filled <= source->size);
94 assert(source->buf == NULL || source->size > 0);
95 assert(source->fd >= 0);
96
97 for (;;) {
98 if (source->buf) {
99 size_t start = MAX(source->scanned, source->offset);
100
101 c = memchr(source->buf + start, '\n',
102 source->filled - start);
103 if (c != NULL)
104 break;
105 }
106
107 source->scanned = source->filled;
108 if (source->scanned >= DATA_SIZE_MAX) {
109 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
110 return -E2BIG;
111 }
112
113 if (source->passive_fd)
114 /* we have to wait for some data to come to us */
115 return -EAGAIN;
116
117 /* We know that source->filled is at most DATA_SIZE_MAX, so if
118 we reallocate it, we'll increase the size at least a bit. */
119 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
120 if (source->size - source->filled < LINE_CHUNK &&
121 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
122 return log_oom();
123
124 assert(source->buf);
125 assert(source->size - source->filled >= LINE_CHUNK ||
126 source->size == ENTRY_SIZE_MAX);
127
128 n = read(source->fd,
129 source->buf + source->filled,
130 source->size - source->filled);
131 if (n < 0) {
132 if (errno != EAGAIN)
133 log_error_errno(errno, "read(%d, ..., %zu): %m",
134 source->fd,
135 source->size - source->filled);
136 return -errno;
137 } else if (n == 0)
138 return 0;
139
140 source->filled += n;
141 }
142
143 *line = source->buf + source->offset;
144 *size = c + 1 - source->buf - source->offset;
145 source->offset += *size;
146
147 return 1;
148 }
149
150 int push_data(RemoteSource *source, const char *data, size_t size) {
151 assert(source);
152 assert(source->state != STATE_EOF);
153
154 if (!realloc_buffer(source, source->filled + size)) {
155 log_error("Failed to store received data of size %zu "
156 "(in addition to existing %zu bytes with %zu filled): %s",
157 size, source->size, source->filled, strerror(ENOMEM));
158 return -ENOMEM;
159 }
160
161 memcpy(source->buf + source->filled, data, size);
162 source->filled += size;
163
164 return 0;
165 }
166
167 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
168
169 assert(source);
170 assert(source->state == STATE_DATA_START ||
171 source->state == STATE_DATA ||
172 source->state == STATE_DATA_FINISH);
173 assert(size <= DATA_SIZE_MAX);
174 assert(source->offset <= source->filled);
175 assert(source->filled <= source->size);
176 assert(source->buf != NULL || source->size == 0);
177 assert(source->buf == NULL || source->size > 0);
178 assert(source->fd >= 0);
179 assert(data);
180
181 while (source->filled - source->offset < size) {
182 int n;
183
184 if (source->passive_fd)
185 /* we have to wait for some data to come to us */
186 return -EAGAIN;
187
188 if (!realloc_buffer(source, source->offset + size))
189 return log_oom();
190
191 n = read(source->fd, source->buf + source->filled,
192 source->size - source->filled);
193 if (n < 0) {
194 if (errno != EAGAIN)
195 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
196 source->size - source->filled);
197 return -errno;
198 } else if (n == 0)
199 return 0;
200
201 source->filled += n;
202 }
203
204 *data = source->buf + source->offset;
205 source->offset += size;
206
207 return 1;
208 }
209
210 static int get_data_size(RemoteSource *source) {
211 int r;
212 void *data;
213
214 assert(source);
215 assert(source->state == STATE_DATA_START);
216 assert(source->data_size == 0);
217
218 r = fill_fixed_size(source, &data, sizeof(uint64_t));
219 if (r <= 0)
220 return r;
221
222 source->data_size = le64toh( *(uint64_t *) data );
223 if (source->data_size > DATA_SIZE_MAX) {
224 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
225 source->data_size, DATA_SIZE_MAX);
226 return -EINVAL;
227 }
228 if (source->data_size == 0)
229 log_warning("Binary field with zero length");
230
231 return 1;
232 }
233
234 static int get_data_data(RemoteSource *source, void **data) {
235 int r;
236
237 assert(source);
238 assert(data);
239 assert(source->state == STATE_DATA);
240
241 r = fill_fixed_size(source, data, source->data_size);
242 if (r <= 0)
243 return r;
244
245 return 1;
246 }
247
248 static int get_data_newline(RemoteSource *source) {
249 int r;
250 char *data;
251
252 assert(source);
253 assert(source->state == STATE_DATA_FINISH);
254
255 r = fill_fixed_size(source, (void**) &data, 1);
256 if (r <= 0)
257 return r;
258
259 assert(data);
260 if (*data != '\n') {
261 log_error("expected newline, got '%c'", *data);
262 return -EINVAL;
263 }
264
265 return 1;
266 }
267
268 static int process_dunder(RemoteSource *source, char *line, size_t n) {
269 const char *timestamp;
270 int r;
271
272 assert(line);
273 assert(n > 0);
274 assert(line[n-1] == '\n');
275
276 /* XXX: is it worth to support timestamps in extended format?
277 * We don't produce them, but who knows... */
278
279 timestamp = startswith(line, "__CURSOR=");
280 if (timestamp)
281 /* ignore __CURSOR */
282 return 1;
283
284 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
285 if (timestamp) {
286 long long unsigned x;
287 line[n-1] = '\0';
288 r = safe_atollu(timestamp, &x);
289 if (r < 0)
290 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
291 else
292 source->ts.realtime = x;
293 return r < 0 ? r : 1;
294 }
295
296 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
297 if (timestamp) {
298 long long unsigned x;
299 line[n-1] = '\0';
300 r = safe_atollu(timestamp, &x);
301 if (r < 0)
302 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
303 else
304 source->ts.monotonic = x;
305 return r < 0 ? r : 1;
306 }
307
308 timestamp = startswith(line, "__");
309 if (timestamp) {
310 log_notice("Unknown dunder line %s", line);
311 return 1;
312 }
313
314 /* no dunder */
315 return 0;
316 }
317
318 static int process_data(RemoteSource *source) {
319 int r;
320
321 switch(source->state) {
322 case STATE_LINE: {
323 char *line, *sep;
324 size_t n = 0;
325
326 assert(source->data_size == 0);
327
328 r = get_line(source, &line, &n);
329 if (r < 0)
330 return r;
331 if (r == 0) {
332 source->state = STATE_EOF;
333 return r;
334 }
335 assert(n > 0);
336 assert(line[n-1] == '\n');
337
338 if (n == 1) {
339 log_trace("Received empty line, event is ready");
340 return 1;
341 }
342
343 r = process_dunder(source, line, n);
344 if (r != 0)
345 return r < 0 ? r : 0;
346
347 /* MESSAGE=xxx\n
348 or
349 COREDUMP\n
350 LLLLLLLL0011223344...\n
351 */
352 sep = memchr(line, '=', n);
353 if (sep) {
354 /* chomp newline */
355 n--;
356
357 r = iovw_put(&source->iovw, line, n);
358 if (r < 0)
359 return r;
360 } else {
361 /* replace \n with = */
362 line[n-1] = '=';
363
364 source->field_len = n;
365 source->state = STATE_DATA_START;
366
367 /* we cannot put the field in iovec until we have all data */
368 }
369
370 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
371
372 return 0; /* continue */
373 }
374
375 case STATE_DATA_START:
376 assert(source->data_size == 0);
377
378 r = get_data_size(source);
379 // log_debug("get_data_size() -> %d", r);
380 if (r < 0)
381 return r;
382 if (r == 0) {
383 source->state = STATE_EOF;
384 return 0;
385 }
386
387 source->state = source->data_size > 0 ?
388 STATE_DATA : STATE_DATA_FINISH;
389
390 return 0; /* continue */
391
392 case STATE_DATA: {
393 void *data;
394 char *field;
395
396 assert(source->data_size > 0);
397
398 r = get_data_data(source, &data);
399 // log_debug("get_data_data() -> %d", r);
400 if (r < 0)
401 return r;
402 if (r == 0) {
403 source->state = STATE_EOF;
404 return 0;
405 }
406
407 assert(data);
408
409 field = (char*) data - sizeof(uint64_t) - source->field_len;
410 memmove(field + sizeof(uint64_t), field, source->field_len);
411
412 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
413 if (r < 0)
414 return r;
415
416 source->state = STATE_DATA_FINISH;
417
418 return 0; /* continue */
419 }
420
421 case STATE_DATA_FINISH:
422 r = get_data_newline(source);
423 // log_debug("get_data_newline() -> %d", r);
424 if (r < 0)
425 return r;
426 if (r == 0) {
427 source->state = STATE_EOF;
428 return 0;
429 }
430
431 source->data_size = 0;
432 source->state = STATE_LINE;
433
434 return 0; /* continue */
435 default:
436 assert_not_reached("wtf?");
437 }
438 }
439
440 int process_source(RemoteSource *source, bool compress, bool seal) {
441 size_t remain, target;
442 int r;
443
444 assert(source);
445 assert(source->writer);
446
447 r = process_data(source);
448 if (r <= 0)
449 return r;
450
451 /* We have a full event */
452 log_trace("Received full event from source@%p fd:%d (%s)",
453 source, source->fd, source->name);
454
455 if (!source->iovw.count) {
456 log_warning("Entry with no payload, skipping");
457 goto freeing;
458 }
459
460 assert(source->iovw.iovec);
461 assert(source->iovw.count);
462
463 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
464 if (r < 0)
465 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
466 iovw_size(&source->iovw));
467 else
468 r = 1;
469
470 freeing:
471 iovw_free_contents(&source->iovw);
472
473 /* possibly reset buffer position */
474 remain = source->filled - source->offset;
475
476 if (remain == 0) /* no brainer */
477 source->offset = source->scanned = source->filled = 0;
478 else if (source->offset > source->size - source->filled &&
479 source->offset > remain) {
480 memcpy(source->buf, source->buf + source->offset, remain);
481 source->offset = source->scanned = 0;
482 source->filled = remain;
483 }
484
485 target = source->size;
486 while (target > 16 * LINE_CHUNK && remain < target / 2)
487 target /= 2;
488 if (target < source->size) {
489 char *tmp;
490
491 tmp = realloc(source->buf, target);
492 if (!tmp)
493 log_warning("Failed to reallocate buffer to (smaller) size %zu",
494 target);
495 else {
496 log_debug("Reallocated buffer from %zu to %zu bytes",
497 source->size, target);
498 source->buf = tmp;
499 source->size = target;
500 }
501 }
502
503 return r;
504 }