]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
tree-wide: remove Emacs lines from all files
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
1 /***
2 This file is part of systemd.
3
4 Copyright 2014 Zbigniew Jędrzejewski-Szmek
5
6 systemd is free software; you can redistribute it and/or modify it
7 under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 systemd is distributed in the hope that it will be useful, but
12 WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with systemd; If not, see <http://www.gnu.org/licenses/>.
18 ***/
19
20 #include "alloc-util.h"
21 #include "fd-util.h"
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24 #include "parse-util.h"
25 #include "string-util.h"
26
27 #define LINE_CHUNK 8*1024u
28
29 void source_free(RemoteSource *source) {
30 if (!source)
31 return;
32
33 if (source->fd >= 0 && !source->passive_fd) {
34 log_debug("Closing fd:%d (%s)", source->fd, source->name);
35 safe_close(source->fd);
36 }
37
38 free(source->name);
39 free(source->buf);
40 iovw_free_contents(&source->iovw);
41
42 log_debug("Writer ref count %i", source->writer->n_ref);
43 writer_unref(source->writer);
44
45 sd_event_source_unref(source->event);
46 sd_event_source_unref(source->buffer_event);
47
48 free(source);
49 }
50
51 /**
52 * Initialize zero-filled source with given values. On success, takes
53 * ownerhship of fd and writer, otherwise does not touch them.
54 */
55 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
56
57 RemoteSource *source;
58
59 log_debug("Creating source for %sfd:%d (%s)",
60 passive_fd ? "passive " : "", fd, name);
61
62 assert(fd >= 0);
63
64 source = new0(RemoteSource, 1);
65 if (!source)
66 return NULL;
67
68 source->fd = fd;
69 source->passive_fd = passive_fd;
70 source->name = name;
71 source->writer = writer;
72
73 return source;
74 }
75
76 static char* realloc_buffer(RemoteSource *source, size_t size) {
77 char *b, *old = source->buf;
78
79 b = GREEDY_REALLOC(source->buf, source->size, size);
80 if (!b)
81 return NULL;
82
83 iovw_rebase(&source->iovw, old, source->buf);
84
85 return b;
86 }
87
88 static int get_line(RemoteSource *source, char **line, size_t *size) {
89 ssize_t n;
90 char *c = NULL;
91
92 assert(source);
93 assert(source->state == STATE_LINE);
94 assert(source->offset <= source->filled);
95 assert(source->filled <= source->size);
96 assert(source->buf == NULL || source->size > 0);
97 assert(source->fd >= 0);
98
99 for (;;) {
100 if (source->buf) {
101 size_t start = MAX(source->scanned, source->offset);
102
103 c = memchr(source->buf + start, '\n',
104 source->filled - start);
105 if (c != NULL)
106 break;
107 }
108
109 source->scanned = source->filled;
110 if (source->scanned >= DATA_SIZE_MAX) {
111 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
112 return -E2BIG;
113 }
114
115 if (source->passive_fd)
116 /* we have to wait for some data to come to us */
117 return -EAGAIN;
118
119 /* We know that source->filled is at most DATA_SIZE_MAX, so if
120 we reallocate it, we'll increase the size at least a bit. */
121 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
122 if (source->size - source->filled < LINE_CHUNK &&
123 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
124 return log_oom();
125
126 assert(source->buf);
127 assert(source->size - source->filled >= LINE_CHUNK ||
128 source->size == ENTRY_SIZE_MAX);
129
130 n = read(source->fd,
131 source->buf + source->filled,
132 source->size - source->filled);
133 if (n < 0) {
134 if (errno != EAGAIN)
135 log_error_errno(errno, "read(%d, ..., %zu): %m",
136 source->fd,
137 source->size - source->filled);
138 return -errno;
139 } else if (n == 0)
140 return 0;
141
142 source->filled += n;
143 }
144
145 *line = source->buf + source->offset;
146 *size = c + 1 - source->buf - source->offset;
147 source->offset += *size;
148
149 return 1;
150 }
151
152 int push_data(RemoteSource *source, const char *data, size_t size) {
153 assert(source);
154 assert(source->state != STATE_EOF);
155
156 if (!realloc_buffer(source, source->filled + size)) {
157 log_error("Failed to store received data of size %zu "
158 "(in addition to existing %zu bytes with %zu filled): %s",
159 size, source->size, source->filled, strerror(ENOMEM));
160 return -ENOMEM;
161 }
162
163 memcpy(source->buf + source->filled, data, size);
164 source->filled += size;
165
166 return 0;
167 }
168
169 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
170
171 assert(source);
172 assert(source->state == STATE_DATA_START ||
173 source->state == STATE_DATA ||
174 source->state == STATE_DATA_FINISH);
175 assert(size <= DATA_SIZE_MAX);
176 assert(source->offset <= source->filled);
177 assert(source->filled <= source->size);
178 assert(source->buf != NULL || source->size == 0);
179 assert(source->buf == NULL || source->size > 0);
180 assert(source->fd >= 0);
181 assert(data);
182
183 while (source->filled - source->offset < size) {
184 int n;
185
186 if (source->passive_fd)
187 /* we have to wait for some data to come to us */
188 return -EAGAIN;
189
190 if (!realloc_buffer(source, source->offset + size))
191 return log_oom();
192
193 n = read(source->fd, source->buf + source->filled,
194 source->size - source->filled);
195 if (n < 0) {
196 if (errno != EAGAIN)
197 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
198 source->size - source->filled);
199 return -errno;
200 } else if (n == 0)
201 return 0;
202
203 source->filled += n;
204 }
205
206 *data = source->buf + source->offset;
207 source->offset += size;
208
209 return 1;
210 }
211
212 static int get_data_size(RemoteSource *source) {
213 int r;
214 void *data;
215
216 assert(source);
217 assert(source->state == STATE_DATA_START);
218 assert(source->data_size == 0);
219
220 r = fill_fixed_size(source, &data, sizeof(uint64_t));
221 if (r <= 0)
222 return r;
223
224 source->data_size = le64toh( *(uint64_t *) data );
225 if (source->data_size > DATA_SIZE_MAX) {
226 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
227 source->data_size, DATA_SIZE_MAX);
228 return -EINVAL;
229 }
230 if (source->data_size == 0)
231 log_warning("Binary field with zero length");
232
233 return 1;
234 }
235
236 static int get_data_data(RemoteSource *source, void **data) {
237 int r;
238
239 assert(source);
240 assert(data);
241 assert(source->state == STATE_DATA);
242
243 r = fill_fixed_size(source, data, source->data_size);
244 if (r <= 0)
245 return r;
246
247 return 1;
248 }
249
250 static int get_data_newline(RemoteSource *source) {
251 int r;
252 char *data;
253
254 assert(source);
255 assert(source->state == STATE_DATA_FINISH);
256
257 r = fill_fixed_size(source, (void**) &data, 1);
258 if (r <= 0)
259 return r;
260
261 assert(data);
262 if (*data != '\n') {
263 log_error("expected newline, got '%c'", *data);
264 return -EINVAL;
265 }
266
267 return 1;
268 }
269
270 static int process_dunder(RemoteSource *source, char *line, size_t n) {
271 const char *timestamp;
272 int r;
273
274 assert(line);
275 assert(n > 0);
276 assert(line[n-1] == '\n');
277
278 /* XXX: is it worth to support timestamps in extended format?
279 * We don't produce them, but who knows... */
280
281 timestamp = startswith(line, "__CURSOR=");
282 if (timestamp)
283 /* ignore __CURSOR */
284 return 1;
285
286 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
287 if (timestamp) {
288 long long unsigned x;
289 line[n-1] = '\0';
290 r = safe_atollu(timestamp, &x);
291 if (r < 0)
292 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
293 else
294 source->ts.realtime = x;
295 return r < 0 ? r : 1;
296 }
297
298 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
299 if (timestamp) {
300 long long unsigned x;
301 line[n-1] = '\0';
302 r = safe_atollu(timestamp, &x);
303 if (r < 0)
304 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
305 else
306 source->ts.monotonic = x;
307 return r < 0 ? r : 1;
308 }
309
310 timestamp = startswith(line, "__");
311 if (timestamp) {
312 log_notice("Unknown dunder line %s", line);
313 return 1;
314 }
315
316 /* no dunder */
317 return 0;
318 }
319
320 static int process_data(RemoteSource *source) {
321 int r;
322
323 switch(source->state) {
324 case STATE_LINE: {
325 char *line, *sep;
326 size_t n = 0;
327
328 assert(source->data_size == 0);
329
330 r = get_line(source, &line, &n);
331 if (r < 0)
332 return r;
333 if (r == 0) {
334 source->state = STATE_EOF;
335 return r;
336 }
337 assert(n > 0);
338 assert(line[n-1] == '\n');
339
340 if (n == 1) {
341 log_trace("Received empty line, event is ready");
342 return 1;
343 }
344
345 r = process_dunder(source, line, n);
346 if (r != 0)
347 return r < 0 ? r : 0;
348
349 /* MESSAGE=xxx\n
350 or
351 COREDUMP\n
352 LLLLLLLL0011223344...\n
353 */
354 sep = memchr(line, '=', n);
355 if (sep) {
356 /* chomp newline */
357 n--;
358
359 r = iovw_put(&source->iovw, line, n);
360 if (r < 0)
361 return r;
362 } else {
363 /* replace \n with = */
364 line[n-1] = '=';
365
366 source->field_len = n;
367 source->state = STATE_DATA_START;
368
369 /* we cannot put the field in iovec until we have all data */
370 }
371
372 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
373
374 return 0; /* continue */
375 }
376
377 case STATE_DATA_START:
378 assert(source->data_size == 0);
379
380 r = get_data_size(source);
381 // log_debug("get_data_size() -> %d", r);
382 if (r < 0)
383 return r;
384 if (r == 0) {
385 source->state = STATE_EOF;
386 return 0;
387 }
388
389 source->state = source->data_size > 0 ?
390 STATE_DATA : STATE_DATA_FINISH;
391
392 return 0; /* continue */
393
394 case STATE_DATA: {
395 void *data;
396 char *field;
397
398 assert(source->data_size > 0);
399
400 r = get_data_data(source, &data);
401 // log_debug("get_data_data() -> %d", r);
402 if (r < 0)
403 return r;
404 if (r == 0) {
405 source->state = STATE_EOF;
406 return 0;
407 }
408
409 assert(data);
410
411 field = (char*) data - sizeof(uint64_t) - source->field_len;
412 memmove(field + sizeof(uint64_t), field, source->field_len);
413
414 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
415 if (r < 0)
416 return r;
417
418 source->state = STATE_DATA_FINISH;
419
420 return 0; /* continue */
421 }
422
423 case STATE_DATA_FINISH:
424 r = get_data_newline(source);
425 // log_debug("get_data_newline() -> %d", r);
426 if (r < 0)
427 return r;
428 if (r == 0) {
429 source->state = STATE_EOF;
430 return 0;
431 }
432
433 source->data_size = 0;
434 source->state = STATE_LINE;
435
436 return 0; /* continue */
437 default:
438 assert_not_reached("wtf?");
439 }
440 }
441
442 int process_source(RemoteSource *source, bool compress, bool seal) {
443 size_t remain, target;
444 int r;
445
446 assert(source);
447 assert(source->writer);
448
449 r = process_data(source);
450 if (r <= 0)
451 return r;
452
453 /* We have a full event */
454 log_trace("Received full event from source@%p fd:%d (%s)",
455 source, source->fd, source->name);
456
457 if (!source->iovw.count) {
458 log_warning("Entry with no payload, skipping");
459 goto freeing;
460 }
461
462 assert(source->iovw.iovec);
463 assert(source->iovw.count);
464
465 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
466 if (r < 0)
467 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
468 iovw_size(&source->iovw));
469 else
470 r = 1;
471
472 freeing:
473 iovw_free_contents(&source->iovw);
474
475 /* possibly reset buffer position */
476 remain = source->filled - source->offset;
477
478 if (remain == 0) /* no brainer */
479 source->offset = source->scanned = source->filled = 0;
480 else if (source->offset > source->size - source->filled &&
481 source->offset > remain) {
482 memcpy(source->buf, source->buf + source->offset, remain);
483 source->offset = source->scanned = 0;
484 source->filled = remain;
485 }
486
487 target = source->size;
488 while (target > 16 * LINE_CHUNK && remain < target / 2)
489 target /= 2;
490 if (target < source->size) {
491 char *tmp;
492
493 tmp = realloc(source->buf, target);
494 if (!tmp)
495 log_warning("Failed to reallocate buffer to (smaller) size %zu",
496 target);
497 else {
498 log_debug("Reallocated buffer from %zu to %zu bytes",
499 source->size, target);
500 source->buf = tmp;
501 source->size = target;
502 }
503 }
504
505 return r;
506 }