]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
util-lib: split out fd-related operations into fd-util.[ch]
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "fd-util.h"
23 #include "journal-remote-parse.h"
24 #include "journald-native.h"
25 #include "string-util.h"
26
27 #define LINE_CHUNK 8*1024u
28
29 void source_free(RemoteSource *source) {
30 if (!source)
31 return;
32
33 if (source->fd >= 0 && !source->passive_fd) {
34 log_debug("Closing fd:%d (%s)", source->fd, source->name);
35 safe_close(source->fd);
36 }
37
38 free(source->name);
39 free(source->buf);
40 iovw_free_contents(&source->iovw);
41
42 log_debug("Writer ref count %i", source->writer->n_ref);
43 writer_unref(source->writer);
44
45 sd_event_source_unref(source->event);
46 sd_event_source_unref(source->buffer_event);
47
48 free(source);
49 }
50
51 /**
52 * Initialize zero-filled source with given values. On success, takes
53 * ownerhship of fd and writer, otherwise does not touch them.
54 */
55 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
56
57 RemoteSource *source;
58
59 log_debug("Creating source for %sfd:%d (%s)",
60 passive_fd ? "passive " : "", fd, name);
61
62 assert(fd >= 0);
63
64 source = new0(RemoteSource, 1);
65 if (!source)
66 return NULL;
67
68 source->fd = fd;
69 source->passive_fd = passive_fd;
70 source->name = name;
71 source->writer = writer;
72
73 return source;
74 }
75
76 static char* realloc_buffer(RemoteSource *source, size_t size) {
77 char *b, *old = source->buf;
78
79 b = GREEDY_REALLOC(source->buf, source->size, size);
80 if (!b)
81 return NULL;
82
83 iovw_rebase(&source->iovw, old, source->buf);
84
85 return b;
86 }
87
88 static int get_line(RemoteSource *source, char **line, size_t *size) {
89 ssize_t n;
90 char *c = NULL;
91
92 assert(source);
93 assert(source->state == STATE_LINE);
94 assert(source->offset <= source->filled);
95 assert(source->filled <= source->size);
96 assert(source->buf == NULL || source->size > 0);
97 assert(source->fd >= 0);
98
99 for (;;) {
100 if (source->buf) {
101 size_t start = MAX(source->scanned, source->offset);
102
103 c = memchr(source->buf + start, '\n',
104 source->filled - start);
105 if (c != NULL)
106 break;
107 }
108
109 source->scanned = source->filled;
110 if (source->scanned >= DATA_SIZE_MAX) {
111 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
112 return -E2BIG;
113 }
114
115 if (source->passive_fd)
116 /* we have to wait for some data to come to us */
117 return -EAGAIN;
118
119 /* We know that source->filled is at most DATA_SIZE_MAX, so if
120 we reallocate it, we'll increase the size at least a bit. */
121 assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
122 if (source->size - source->filled < LINE_CHUNK &&
123 !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
124 return log_oom();
125
126 assert(source->buf);
127 assert(source->size - source->filled >= LINE_CHUNK ||
128 source->size == ENTRY_SIZE_MAX);
129
130 n = read(source->fd,
131 source->buf + source->filled,
132 source->size - source->filled);
133 if (n < 0) {
134 if (errno != EAGAIN)
135 log_error_errno(errno, "read(%d, ..., %zu): %m",
136 source->fd,
137 source->size - source->filled);
138 return -errno;
139 } else if (n == 0)
140 return 0;
141
142 source->filled += n;
143 }
144
145 *line = source->buf + source->offset;
146 *size = c + 1 - source->buf - source->offset;
147 source->offset += *size;
148
149 return 1;
150 }
151
152 int push_data(RemoteSource *source, const char *data, size_t size) {
153 assert(source);
154 assert(source->state != STATE_EOF);
155
156 if (!realloc_buffer(source, source->filled + size)) {
157 log_error("Failed to store received data of size %zu "
158 "(in addition to existing %zu bytes with %zu filled): %s",
159 size, source->size, source->filled, strerror(ENOMEM));
160 return -ENOMEM;
161 }
162
163 memcpy(source->buf + source->filled, data, size);
164 source->filled += size;
165
166 return 0;
167 }
168
169 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
170
171 assert(source);
172 assert(source->state == STATE_DATA_START ||
173 source->state == STATE_DATA ||
174 source->state == STATE_DATA_FINISH);
175 assert(size <= DATA_SIZE_MAX);
176 assert(source->offset <= source->filled);
177 assert(source->filled <= source->size);
178 assert(source->buf != NULL || source->size == 0);
179 assert(source->buf == NULL || source->size > 0);
180 assert(source->fd >= 0);
181 assert(data);
182
183 while (source->filled - source->offset < size) {
184 int n;
185
186 if (source->passive_fd)
187 /* we have to wait for some data to come to us */
188 return -EAGAIN;
189
190 if (!realloc_buffer(source, source->offset + size))
191 return log_oom();
192
193 n = read(source->fd, source->buf + source->filled,
194 source->size - source->filled);
195 if (n < 0) {
196 if (errno != EAGAIN)
197 log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
198 source->size - source->filled);
199 return -errno;
200 } else if (n == 0)
201 return 0;
202
203 source->filled += n;
204 }
205
206 *data = source->buf + source->offset;
207 source->offset += size;
208
209 return 1;
210 }
211
212 static int get_data_size(RemoteSource *source) {
213 int r;
214 void *data;
215
216 assert(source);
217 assert(source->state == STATE_DATA_START);
218 assert(source->data_size == 0);
219
220 r = fill_fixed_size(source, &data, sizeof(uint64_t));
221 if (r <= 0)
222 return r;
223
224 source->data_size = le64toh( *(uint64_t *) data );
225 if (source->data_size > DATA_SIZE_MAX) {
226 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
227 source->data_size, DATA_SIZE_MAX);
228 return -EINVAL;
229 }
230 if (source->data_size == 0)
231 log_warning("Binary field with zero length");
232
233 return 1;
234 }
235
236 static int get_data_data(RemoteSource *source, void **data) {
237 int r;
238
239 assert(source);
240 assert(data);
241 assert(source->state == STATE_DATA);
242
243 r = fill_fixed_size(source, data, source->data_size);
244 if (r <= 0)
245 return r;
246
247 return 1;
248 }
249
250 static int get_data_newline(RemoteSource *source) {
251 int r;
252 char *data;
253
254 assert(source);
255 assert(source->state == STATE_DATA_FINISH);
256
257 r = fill_fixed_size(source, (void**) &data, 1);
258 if (r <= 0)
259 return r;
260
261 assert(data);
262 if (*data != '\n') {
263 log_error("expected newline, got '%c'", *data);
264 return -EINVAL;
265 }
266
267 return 1;
268 }
269
270 static int process_dunder(RemoteSource *source, char *line, size_t n) {
271 const char *timestamp;
272 int r;
273
274 assert(line);
275 assert(n > 0);
276 assert(line[n-1] == '\n');
277
278 /* XXX: is it worth to support timestamps in extended format?
279 * We don't produce them, but who knows... */
280
281 timestamp = startswith(line, "__CURSOR=");
282 if (timestamp)
283 /* ignore __CURSOR */
284 return 1;
285
286 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
287 if (timestamp) {
288 long long unsigned x;
289 line[n-1] = '\0';
290 r = safe_atollu(timestamp, &x);
291 if (r < 0)
292 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
293 else
294 source->ts.realtime = x;
295 return r < 0 ? r : 1;
296 }
297
298 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
299 if (timestamp) {
300 long long unsigned x;
301 line[n-1] = '\0';
302 r = safe_atollu(timestamp, &x);
303 if (r < 0)
304 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
305 else
306 source->ts.monotonic = x;
307 return r < 0 ? r : 1;
308 }
309
310 timestamp = startswith(line, "__");
311 if (timestamp) {
312 log_notice("Unknown dunder line %s", line);
313 return 1;
314 }
315
316 /* no dunder */
317 return 0;
318 }
319
320 static int process_data(RemoteSource *source) {
321 int r;
322
323 switch(source->state) {
324 case STATE_LINE: {
325 char *line, *sep;
326 size_t n = 0;
327
328 assert(source->data_size == 0);
329
330 r = get_line(source, &line, &n);
331 if (r < 0)
332 return r;
333 if (r == 0) {
334 source->state = STATE_EOF;
335 return r;
336 }
337 assert(n > 0);
338 assert(line[n-1] == '\n');
339
340 if (n == 1) {
341 log_trace("Received empty line, event is ready");
342 return 1;
343 }
344
345 r = process_dunder(source, line, n);
346 if (r != 0)
347 return r < 0 ? r : 0;
348
349 /* MESSAGE=xxx\n
350 or
351 COREDUMP\n
352 LLLLLLLL0011223344...\n
353 */
354 sep = memchr(line, '=', n);
355 if (sep) {
356 /* chomp newline */
357 n--;
358
359 r = iovw_put(&source->iovw, line, n);
360 if (r < 0)
361 return r;
362 } else {
363 /* replace \n with = */
364 line[n-1] = '=';
365
366 source->field_len = n;
367 source->state = STATE_DATA_START;
368
369 /* we cannot put the field in iovec until we have all data */
370 }
371
372 log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
373
374 return 0; /* continue */
375 }
376
377 case STATE_DATA_START:
378 assert(source->data_size == 0);
379
380 r = get_data_size(source);
381 // log_debug("get_data_size() -> %d", r);
382 if (r < 0)
383 return r;
384 if (r == 0) {
385 source->state = STATE_EOF;
386 return 0;
387 }
388
389 source->state = source->data_size > 0 ?
390 STATE_DATA : STATE_DATA_FINISH;
391
392 return 0; /* continue */
393
394 case STATE_DATA: {
395 void *data;
396 char *field;
397
398 assert(source->data_size > 0);
399
400 r = get_data_data(source, &data);
401 // log_debug("get_data_data() -> %d", r);
402 if (r < 0)
403 return r;
404 if (r == 0) {
405 source->state = STATE_EOF;
406 return 0;
407 }
408
409 assert(data);
410
411 field = (char*) data - sizeof(uint64_t) - source->field_len;
412 memmove(field + sizeof(uint64_t), field, source->field_len);
413
414 r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
415 if (r < 0)
416 return r;
417
418 source->state = STATE_DATA_FINISH;
419
420 return 0; /* continue */
421 }
422
423 case STATE_DATA_FINISH:
424 r = get_data_newline(source);
425 // log_debug("get_data_newline() -> %d", r);
426 if (r < 0)
427 return r;
428 if (r == 0) {
429 source->state = STATE_EOF;
430 return 0;
431 }
432
433 source->data_size = 0;
434 source->state = STATE_LINE;
435
436 return 0; /* continue */
437 default:
438 assert_not_reached("wtf?");
439 }
440 }
441
442 int process_source(RemoteSource *source, bool compress, bool seal) {
443 size_t remain, target;
444 int r;
445
446 assert(source);
447 assert(source->writer);
448
449 r = process_data(source);
450 if (r <= 0)
451 return r;
452
453 /* We have a full event */
454 log_trace("Received full event from source@%p fd:%d (%s)",
455 source, source->fd, source->name);
456
457 if (!source->iovw.count) {
458 log_warning("Entry with no payload, skipping");
459 goto freeing;
460 }
461
462 assert(source->iovw.iovec);
463 assert(source->iovw.count);
464
465 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
466 if (r < 0)
467 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
468 iovw_size(&source->iovw));
469 else
470 r = 1;
471
472 freeing:
473 iovw_free_contents(&source->iovw);
474
475 /* possibly reset buffer position */
476 remain = source->filled - source->offset;
477
478 if (remain == 0) /* no brainer */
479 source->offset = source->scanned = source->filled = 0;
480 else if (source->offset > source->size - source->filled &&
481 source->offset > remain) {
482 memcpy(source->buf, source->buf + source->offset, remain);
483 source->offset = source->scanned = 0;
484 source->filled = remain;
485 }
486
487 target = source->size;
488 while (target > 16 * LINE_CHUNK && remain < target / 2)
489 target /= 2;
490 if (target < source->size) {
491 char *tmp;
492
493 tmp = realloc(source->buf, target);
494 if (!tmp)
495 log_warning("Failed to reallocate buffer to (smaller) size %zu",
496 target);
497 else {
498 log_debug("Reallocated buffer from %zu to %zu bytes",
499 source->size, target);
500 source->buf = tmp;
501 source->size = target;
502 }
503 }
504
505 return r;
506 }