]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
treewide: use log_*_errno whenever %m is in the format string
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
31 if (source->fd >= 0 && !source->passive_fd) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
34 }
35
36 free(source->name);
37 free(source->buf);
38 iovw_free_contents(&source->iovw);
39
40 log_debug("Writer ref count %u", source->writer->n_ref);
41 writer_unref(source->writer);
42
43 sd_event_source_unref(source->event);
44
45 free(source);
46 }
47
48 /**
49 * Initialize zero-filled source with given values. On success, takes
50 * ownerhship of fd and writer, otherwise does not touch them.
51 */
52 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
53
54 RemoteSource *source;
55
56 log_debug("Creating source for %sfd:%d (%s)",
57 passive_fd ? "passive " : "", fd, name);
58
59 assert(fd >= 0);
60
61 source = new0(RemoteSource, 1);
62 if (!source)
63 return NULL;
64
65 source->fd = fd;
66 source->passive_fd = passive_fd;
67 source->name = name;
68 source->writer = writer;
69
70 return source;
71 }
72
73 static char* realloc_buffer(RemoteSource *source, size_t size) {
74 char *b, *old = source->buf;
75
76 b = GREEDY_REALLOC(source->buf, source->size, size);
77 if (!b)
78 return NULL;
79
80 iovw_rebase(&source->iovw, old, source->buf);
81
82 return b;
83 }
84
85 static int get_line(RemoteSource *source, char **line, size_t *size) {
86 ssize_t n;
87 char *c = NULL;
88
89 assert(source);
90 assert(source->state == STATE_LINE);
91 assert(source->offset <= source->filled);
92 assert(source->filled <= source->size);
93 assert(source->buf == NULL || source->size > 0);
94 assert(source->fd >= 0);
95
96 while (true) {
97 if (source->buf) {
98 size_t start = MAX(source->scanned, source->offset);
99
100 c = memchr(source->buf + start, '\n',
101 source->filled - start);
102 if (c != NULL)
103 break;
104 }
105
106 source->scanned = source->filled;
107 if (source->scanned >= DATA_SIZE_MAX) {
108 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
109 return -E2BIG;
110 }
111
112 if (source->passive_fd)
113 /* we have to wait for some data to come to us */
114 return -EWOULDBLOCK;
115
116 if (source->size - source->filled < LINE_CHUNK &&
117 !realloc_buffer(source,
118 MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
119 return log_oom();
120
121 assert(source->size - source->filled >= LINE_CHUNK ||
122 source->size == ENTRY_SIZE_MAX);
123
124 n = read(source->fd, source->buf + source->filled,
125 source->size - source->filled);
126 if (n < 0) {
127 if (errno != EAGAIN && errno != EWOULDBLOCK)
128 log_error_errno(errno, "read(%d, ..., %zd): %m", source->fd,
129 source->size - source->filled);
130 return -errno;
131 } else if (n == 0)
132 return 0;
133
134 source->filled += n;
135 }
136
137 *line = source->buf + source->offset;
138 *size = c + 1 - source->buf - source->offset;
139 source->offset += *size;
140
141 return 1;
142 }
143
144 int push_data(RemoteSource *source, const char *data, size_t size) {
145 assert(source);
146 assert(source->state != STATE_EOF);
147
148 if (!realloc_buffer(source, source->filled + size)) {
149 log_error("Failed to store received data of size %zu "
150 "(in addition to existing %zu bytes with %zu filled): %s",
151 size, source->size, source->filled, strerror(ENOMEM));
152 return -ENOMEM;
153 }
154
155 memcpy(source->buf + source->filled, data, size);
156 source->filled += size;
157
158 return 0;
159 }
160
161 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
162
163 assert(source);
164 assert(source->state == STATE_DATA_START ||
165 source->state == STATE_DATA ||
166 source->state == STATE_DATA_FINISH);
167 assert(size <= DATA_SIZE_MAX);
168 assert(source->offset <= source->filled);
169 assert(source->filled <= source->size);
170 assert(source->buf != NULL || source->size == 0);
171 assert(source->buf == NULL || source->size > 0);
172 assert(source->fd >= 0);
173 assert(data);
174
175 while (source->filled - source->offset < size) {
176 int n;
177
178 if (source->passive_fd)
179 /* we have to wait for some data to come to us */
180 return -EWOULDBLOCK;
181
182 if (!realloc_buffer(source, source->offset + size))
183 return log_oom();
184
185 n = read(source->fd, source->buf + source->filled,
186 source->size - source->filled);
187 if (n < 0) {
188 if (errno != EAGAIN && errno != EWOULDBLOCK)
189 log_error_errno(errno, "read(%d, ..., %zd): %m", source->fd,
190 source->size - source->filled);
191 return -errno;
192 } else if (n == 0)
193 return 0;
194
195 source->filled += n;
196 }
197
198 *data = source->buf + source->offset;
199 source->offset += size;
200
201 return 1;
202 }
203
204 static int get_data_size(RemoteSource *source) {
205 int r;
206 void *data;
207
208 assert(source);
209 assert(source->state == STATE_DATA_START);
210 assert(source->data_size == 0);
211
212 r = fill_fixed_size(source, &data, sizeof(uint64_t));
213 if (r <= 0)
214 return r;
215
216 source->data_size = le64toh( *(uint64_t *) data );
217 if (source->data_size > DATA_SIZE_MAX) {
218 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
219 source->data_size, DATA_SIZE_MAX);
220 return -EINVAL;
221 }
222 if (source->data_size == 0)
223 log_warning("Binary field with zero length");
224
225 return 1;
226 }
227
228 static int get_data_data(RemoteSource *source, void **data) {
229 int r;
230
231 assert(source);
232 assert(data);
233 assert(source->state == STATE_DATA);
234
235 r = fill_fixed_size(source, data, source->data_size);
236 if (r <= 0)
237 return r;
238
239 return 1;
240 }
241
242 static int get_data_newline(RemoteSource *source) {
243 int r;
244 char *data;
245
246 assert(source);
247 assert(source->state == STATE_DATA_FINISH);
248
249 r = fill_fixed_size(source, (void**) &data, 1);
250 if (r <= 0)
251 return r;
252
253 assert(data);
254 if (*data != '\n') {
255 log_error("expected newline, got '%c'", *data);
256 return -EINVAL;
257 }
258
259 return 1;
260 }
261
262 static int process_dunder(RemoteSource *source, char *line, size_t n) {
263 const char *timestamp;
264 int r;
265
266 assert(line);
267 assert(n > 0);
268 assert(line[n-1] == '\n');
269
270 /* XXX: is it worth to support timestamps in extended format?
271 * We don't produce them, but who knows... */
272
273 timestamp = startswith(line, "__CURSOR=");
274 if (timestamp)
275 /* ignore __CURSOR */
276 return 1;
277
278 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
279 if (timestamp) {
280 long long unsigned x;
281 line[n-1] = '\0';
282 r = safe_atollu(timestamp, &x);
283 if (r < 0)
284 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
285 else
286 source->ts.realtime = x;
287 return r < 0 ? r : 1;
288 }
289
290 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
291 if (timestamp) {
292 long long unsigned x;
293 line[n-1] = '\0';
294 r = safe_atollu(timestamp, &x);
295 if (r < 0)
296 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
297 else
298 source->ts.monotonic = x;
299 return r < 0 ? r : 1;
300 }
301
302 timestamp = startswith(line, "__");
303 if (timestamp) {
304 log_notice("Unknown dunder line %s", line);
305 return 1;
306 }
307
308 /* no dunder */
309 return 0;
310 }
311
312 int process_data(RemoteSource *source) {
313 int r;
314
315 switch(source->state) {
316 case STATE_LINE: {
317 char *line, *sep;
318 size_t n;
319
320 assert(source->data_size == 0);
321
322 r = get_line(source, &line, &n);
323 if (r < 0)
324 return r;
325 if (r == 0) {
326 source->state = STATE_EOF;
327 return r;
328 }
329 assert(n > 0);
330 assert(line[n-1] == '\n');
331
332 if (n == 1) {
333 log_trace("Received empty line, event is ready");
334 return 1;
335 }
336
337 r = process_dunder(source, line, n);
338 if (r != 0)
339 return r < 0 ? r : 0;
340
341 /* MESSAGE=xxx\n
342 or
343 COREDUMP\n
344 LLLLLLLL0011223344...\n
345 */
346 sep = memchr(line, '=', n);
347 if (sep)
348 /* chomp newline */
349 n--;
350 else
351 /* replace \n with = */
352 line[n-1] = '=';
353 log_trace("Received: %.*s", (int) n, line);
354
355 r = iovw_put(&source->iovw, line, n);
356 if (r < 0) {
357 log_error("Failed to put line in iovect");
358 return r;
359 }
360
361 if (!sep)
362 source->state = STATE_DATA_START;
363 return 0; /* continue */
364 }
365
366 case STATE_DATA_START:
367 assert(source->data_size == 0);
368
369 r = get_data_size(source);
370 // log_debug("get_data_size() -> %d", r);
371 if (r < 0)
372 return r;
373 if (r == 0) {
374 source->state = STATE_EOF;
375 return 0;
376 }
377
378 source->state = source->data_size > 0 ?
379 STATE_DATA : STATE_DATA_FINISH;
380
381 return 0; /* continue */
382
383 case STATE_DATA: {
384 void *data;
385
386 assert(source->data_size > 0);
387
388 r = get_data_data(source, &data);
389 // log_debug("get_data_data() -> %d", r);
390 if (r < 0)
391 return r;
392 if (r == 0) {
393 source->state = STATE_EOF;
394 return 0;
395 }
396
397 assert(data);
398
399 r = iovw_put(&source->iovw, data, source->data_size);
400 if (r < 0) {
401 log_error("failed to put binary buffer in iovect");
402 return r;
403 }
404
405 source->state = STATE_DATA_FINISH;
406
407 return 0; /* continue */
408 }
409
410 case STATE_DATA_FINISH:
411 r = get_data_newline(source);
412 // log_debug("get_data_newline() -> %d", r);
413 if (r < 0)
414 return r;
415 if (r == 0) {
416 source->state = STATE_EOF;
417 return 0;
418 }
419
420 source->data_size = 0;
421 source->state = STATE_LINE;
422
423 return 0; /* continue */
424 default:
425 assert_not_reached("wtf?");
426 }
427 }
428
429 int process_source(RemoteSource *source, bool compress, bool seal) {
430 size_t remain, target;
431 int r;
432
433 assert(source);
434 assert(source->writer);
435
436 r = process_data(source);
437 if (r <= 0)
438 return r;
439
440 /* We have a full event */
441 log_trace("Received a full event from source@%p fd:%d (%s)",
442 source, source->fd, source->name);
443
444 if (!source->iovw.count) {
445 log_warning("Entry with no payload, skipping");
446 goto freeing;
447 }
448
449 assert(source->iovw.iovec);
450 assert(source->iovw.count);
451
452 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
453 if (r < 0)
454 log_error_errno(r, "Failed to write entry of %zu bytes: %m",
455 iovw_size(&source->iovw));
456 else
457 r = 1;
458
459 freeing:
460 iovw_free_contents(&source->iovw);
461
462 /* possibly reset buffer position */
463 remain = source->filled - source->offset;
464
465 if (remain == 0) /* no brainer */
466 source->offset = source->scanned = source->filled = 0;
467 else if (source->offset > source->size - source->filled &&
468 source->offset > remain) {
469 memcpy(source->buf, source->buf + source->offset, remain);
470 source->offset = source->scanned = 0;
471 source->filled = remain;
472 }
473
474 target = source->size;
475 while (target > 16 * LINE_CHUNK && remain < target / 2)
476 target /= 2;
477 if (target < source->size) {
478 char *tmp;
479
480 tmp = realloc(source->buf, target);
481 if (!tmp)
482 log_warning("Failed to reallocate buffer to (smaller) size %zu",
483 target);
484 else {
485 log_debug("Reallocated buffer from %zu to %zu bytes",
486 source->size, target);
487 source->buf = tmp;
488 source->size = target;
489 }
490 }
491
492 return r;
493 }