]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-remote-parse.c
journal-remote: rework fd and writer reference handling
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 8*1024u
26
27 void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
31 if (source->fd >= 0 && !source->passive_fd) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 safe_close(source->fd);
34 }
35
36 free(source->name);
37 free(source->buf);
38 iovw_free_contents(&source->iovw);
39
40 log_debug("Writer ref count %u", source->writer->n_ref);
41 writer_unref(source->writer);
42
43 sd_event_source_unref(source->event);
44
45 free(source);
46 }
47
48 /**
49 * Initialize zero-filled source with given values. On success, takes
50 * ownerhship of fd and writer, otherwise does not touch them.
51 */
52 RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
53
54 RemoteSource *source;
55
56 log_debug("Creating source for %sfd:%d (%s)",
57 passive_fd ? "passive " : "", fd, name);
58
59 assert(fd >= 0);
60
61 source = new0(RemoteSource, 1);
62 if (!source)
63 return NULL;
64
65 source->fd = fd;
66 source->passive_fd = passive_fd;
67 source->name = name;
68 source->writer = writer;
69
70 return source;
71 }
72
73 static int get_line(RemoteSource *source, char **line, size_t *size) {
74 ssize_t n, remain;
75 char *c = NULL;
76 char *newbuf = NULL;
77 size_t newsize = 0;
78
79 assert(source);
80 assert(source->state == STATE_LINE);
81 assert(source->filled <= source->size);
82 assert(source->buf == NULL || source->size > 0);
83 assert(source->fd >= 0);
84
85 while (true) {
86 if (source->buf)
87 c = memchr(source->buf + source->scanned, '\n',
88 source->filled - source->scanned);
89 if (c != NULL)
90 break;
91
92 source->scanned = source->filled;
93 if (source->scanned >= DATA_SIZE_MAX) {
94 log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX);
95 return -E2BIG;
96 }
97
98 if (source->passive_fd)
99 /* we have to wait for some data to come to us */
100 return -EWOULDBLOCK;
101
102 if (source->size - source->filled < LINE_CHUNK &&
103 !GREEDY_REALLOC(source->buf, source->size,
104 MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
105 return log_oom();
106
107 assert(source->size - source->filled >= LINE_CHUNK ||
108 source->size == DATA_SIZE_MAX);
109
110 // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
111 // to accomodate such big fields.
112
113 n = read(source->fd, source->buf + source->filled,
114 source->size - source->filled);
115 if (n < 0) {
116 if (errno != EAGAIN && errno != EWOULDBLOCK)
117 log_error("read(%d, ..., %zd): %m", source->fd,
118 source->size - source->filled);
119 return -errno;
120 } else if (n == 0)
121 return 0;
122
123 source->filled += n;
124 }
125
126 *line = source->buf;
127 *size = c + 1 - source->buf;
128
129 /* Check if something remains */
130 remain = source->buf + source->filled - c - 1;
131 assert(remain >= 0);
132 if (remain) {
133 newsize = MAX(remain, LINE_CHUNK);
134 newbuf = malloc(newsize);
135 if (!newbuf)
136 return log_oom();
137 memcpy(newbuf, c + 1, remain);
138 }
139 source->buf = newbuf;
140 source->size = newsize;
141 source->filled = remain;
142 source->scanned = 0;
143
144 return 1;
145 }
146
147 int push_data(RemoteSource *source, const char *data, size_t size) {
148 assert(source);
149 assert(source->state != STATE_EOF);
150
151 if (!GREEDY_REALLOC(source->buf, source->size,
152 source->filled + size)) {
153 log_error("Failed to store received data of size %zu "
154 "(in addition to existing %zu bytes with %zu filled): %s",
155 size, source->size, source->filled, strerror(ENOMEM));
156 return -ENOMEM;
157 }
158
159 memcpy(source->buf + source->filled, data, size);
160 source->filled += size;
161
162 return 0;
163 }
164
165 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
166 int n;
167 char *newbuf = NULL;
168 size_t newsize = 0, remain;
169
170 assert(source);
171 assert(source->state == STATE_DATA_START ||
172 source->state == STATE_DATA ||
173 source->state == STATE_DATA_FINISH);
174 assert(size <= DATA_SIZE_MAX);
175 assert(source->filled <= source->size);
176 assert(source->scanned <= source->filled);
177 assert(source->buf != NULL || source->size == 0);
178 assert(source->buf == NULL || source->size > 0);
179 assert(source->fd >= 0);
180 assert(data);
181
182 while(source->filled < size) {
183 if (source->passive_fd)
184 /* we have to wait for some data to come to us */
185 return -EWOULDBLOCK;
186
187 if (!GREEDY_REALLOC(source->buf, source->size, size))
188 return log_oom();
189
190 n = read(source->fd, source->buf + source->filled,
191 source->size - source->filled);
192 if (n < 0) {
193 if (errno != EAGAIN && errno != EWOULDBLOCK)
194 log_error("read(%d, ..., %zd): %m", source->fd,
195 source->size - source->filled);
196 return -errno;
197 } else if (n == 0)
198 return 0;
199
200 source->filled += n;
201 }
202
203 *data = source->buf;
204
205 /* Check if something remains */
206 assert(size <= source->filled);
207 remain = source->filled - size;
208 if (remain) {
209 newsize = MAX(remain, LINE_CHUNK);
210 newbuf = malloc(newsize);
211 if (!newbuf)
212 return log_oom();
213 memcpy(newbuf, source->buf + size, remain);
214 }
215 source->buf = newbuf;
216 source->size = newsize;
217 source->filled = remain;
218 source->scanned = 0;
219
220 return 1;
221 }
222
223 static int get_data_size(RemoteSource *source) {
224 int r;
225 _cleanup_free_ void *data = NULL;
226
227 assert(source);
228 assert(source->state == STATE_DATA_START);
229 assert(source->data_size == 0);
230
231 r = fill_fixed_size(source, &data, sizeof(uint64_t));
232 if (r <= 0)
233 return r;
234
235 source->data_size = le64toh( *(uint64_t *) data );
236 if (source->data_size > DATA_SIZE_MAX) {
237 log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u",
238 source->data_size, DATA_SIZE_MAX);
239 return -EINVAL;
240 }
241 if (source->data_size == 0)
242 log_warning("Binary field with zero length");
243
244 return 1;
245 }
246
247 static int get_data_data(RemoteSource *source, void **data) {
248 int r;
249
250 assert(source);
251 assert(data);
252 assert(source->state == STATE_DATA);
253
254 r = fill_fixed_size(source, data, source->data_size);
255 if (r <= 0)
256 return r;
257
258 return 1;
259 }
260
261 static int get_data_newline(RemoteSource *source) {
262 int r;
263 _cleanup_free_ char *data = NULL;
264
265 assert(source);
266 assert(source->state == STATE_DATA_FINISH);
267
268 r = fill_fixed_size(source, (void**) &data, 1);
269 if (r <= 0)
270 return r;
271
272 assert(data);
273 if (*data != '\n') {
274 log_error("expected newline, got '%c'", *data);
275 return -EINVAL;
276 }
277
278 return 1;
279 }
280
281 static int process_dunder(RemoteSource *source, char *line, size_t n) {
282 const char *timestamp;
283 int r;
284
285 assert(line);
286 assert(n > 0);
287 assert(line[n-1] == '\n');
288
289 /* XXX: is it worth to support timestamps in extended format?
290 * We don't produce them, but who knows... */
291
292 timestamp = startswith(line, "__CURSOR=");
293 if (timestamp)
294 /* ignore __CURSOR */
295 return 1;
296
297 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
298 if (timestamp) {
299 long long unsigned x;
300 line[n-1] = '\0';
301 r = safe_atollu(timestamp, &x);
302 if (r < 0)
303 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
304 else
305 source->ts.realtime = x;
306 return r < 0 ? r : 1;
307 }
308
309 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
310 if (timestamp) {
311 long long unsigned x;
312 line[n-1] = '\0';
313 r = safe_atollu(timestamp, &x);
314 if (r < 0)
315 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
316 else
317 source->ts.monotonic = x;
318 return r < 0 ? r : 1;
319 }
320
321 timestamp = startswith(line, "__");
322 if (timestamp) {
323 log_notice("Unknown dunder line %s", line);
324 return 1;
325 }
326
327 /* no dunder */
328 return 0;
329 }
330
331 int process_data(RemoteSource *source) {
332 int r;
333
334 switch(source->state) {
335 case STATE_LINE: {
336 char *line, *sep;
337 size_t n;
338
339 assert(source->data_size == 0);
340
341 r = get_line(source, &line, &n);
342 if (r < 0)
343 return r;
344 if (r == 0) {
345 source->state = STATE_EOF;
346 return r;
347 }
348 assert(n > 0);
349 assert(line[n-1] == '\n');
350
351 if (n == 1) {
352 log_debug("Received empty line, event is ready");
353 free(line);
354 return 1;
355 }
356
357 r = process_dunder(source, line, n);
358 if (r != 0) {
359 free(line);
360 return r < 0 ? r : 0;
361 }
362
363 /* MESSAGE=xxx\n
364 or
365 COREDUMP\n
366 LLLLLLLL0011223344...\n
367 */
368 sep = memchr(line, '=', n);
369 if (sep)
370 /* chomp newline */
371 n--;
372 else
373 /* replace \n with = */
374 line[n-1] = '=';
375 log_debug("Received: %.*s", (int) n, line);
376
377 r = iovw_put(&source->iovw, line, n);
378 if (r < 0) {
379 log_error("Failed to put line in iovect");
380 free(line);
381 return r;
382 }
383
384 if (!sep)
385 source->state = STATE_DATA_START;
386 return 0; /* continue */
387 }
388
389 case STATE_DATA_START:
390 assert(source->data_size == 0);
391
392 r = get_data_size(source);
393 log_debug("get_data_size() -> %d", r);
394 if (r < 0)
395 return r;
396 if (r == 0) {
397 source->state = STATE_EOF;
398 return 0;
399 }
400
401 source->state = source->data_size > 0 ?
402 STATE_DATA : STATE_DATA_FINISH;
403
404 return 0; /* continue */
405
406 case STATE_DATA: {
407 void *data;
408
409 assert(source->data_size > 0);
410
411 r = get_data_data(source, &data);
412 log_debug("get_data_data() -> %d", r);
413 if (r < 0)
414 return r;
415 if (r == 0) {
416 source->state = STATE_EOF;
417 return 0;
418 }
419
420 assert(data);
421
422 r = iovw_put(&source->iovw, data, source->data_size);
423 if (r < 0) {
424 log_error("failed to put binary buffer in iovect");
425 return r;
426 }
427
428 source->state = STATE_DATA_FINISH;
429
430 return 0; /* continue */
431 }
432
433 case STATE_DATA_FINISH:
434 r = get_data_newline(source);
435 log_debug("get_data_newline() -> %d", r);
436 if (r < 0)
437 return r;
438 if (r == 0) {
439 source->state = STATE_EOF;
440 return 0;
441 }
442
443 source->data_size = 0;
444 source->state = STATE_LINE;
445
446 return 0; /* continue */
447 default:
448 assert_not_reached("wtf?");
449 }
450 }
451
452 int process_source(RemoteSource *source, bool compress, bool seal) {
453 int r;
454
455 assert(source);
456 assert(source->writer);
457
458 r = process_data(source);
459 if (r <= 0)
460 return r;
461
462 /* We have a full event */
463 log_debug("Received a full event from source@%p fd:%d (%s)",
464 source, source->fd, source->name);
465
466 if (!source->iovw.count) {
467 log_warning("Entry with no payload, skipping");
468 goto freeing;
469 }
470
471 assert(source->iovw.iovec);
472 assert(source->iovw.count);
473
474 r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
475 if (r < 0)
476 log_error("Failed to write entry of %zu bytes: %s",
477 iovw_size(&source->iovw), strerror(-r));
478 else
479 r = 1;
480
481 freeing:
482 iovw_free_contents(&source->iovw);
483 return r;
484 }