]>
Commit | Line | Data |
---|---|---|
fdfccdbc ZJS |
1 | /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ |
2 | ||
3 | /*** | |
4 | This file is part of systemd. | |
5 | ||
6 | Copyright 2014 Zbigniew Jędrzejewski-Szmek | |
7 | ||
8 | systemd is free software; you can redistribute it and/or modify it | |
9 | under the terms of the GNU Lesser General Public License as published by | |
10 | the Free Software Foundation; either version 2.1 of the License, or | |
11 | (at your option) any later version. | |
12 | ||
13 | systemd is distributed in the hope that it will be useful, but | |
14 | WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | Lesser General Public License for more details. | |
17 | ||
18 | You should have received a copy of the GNU Lesser General Public License | |
19 | along with systemd; If not, see <http://www.gnu.org/licenses/>. | |
20 | ***/ | |
21 | ||
b5efdb8a | 22 | #include "alloc-util.h" |
3ffd4af2 LP |
23 | #include "fd-util.h" |
24 | #include "journal-remote-parse.h" | |
fdfccdbc | 25 | #include "journald-native.h" |
6bedfcbb | 26 | #include "parse-util.h" |
07630cea | 27 | #include "string-util.h" |
fdfccdbc | 28 | |
4a0a6ac0 | 29 | #define LINE_CHUNK 8*1024u |
fdfccdbc ZJS |
30 | |
31 | void source_free(RemoteSource *source) { | |
32 | if (!source) | |
33 | return; | |
34 | ||
9ff48d09 | 35 | if (source->fd >= 0 && !source->passive_fd) { |
fdfccdbc | 36 | log_debug("Closing fd:%d (%s)", source->fd, source->name); |
8201af08 | 37 | safe_close(source->fd); |
fdfccdbc | 38 | } |
9ff48d09 | 39 | |
fdfccdbc ZJS |
40 | free(source->name); |
41 | free(source->buf); | |
42 | iovw_free_contents(&source->iovw); | |
8201af08 | 43 | |
1fa2f38f | 44 | log_debug("Writer ref count %i", source->writer->n_ref); |
9ff48d09 ZJS |
45 | writer_unref(source->writer); |
46 | ||
8201af08 | 47 | sd_event_source_unref(source->event); |
043945b9 | 48 | sd_event_source_unref(source->buffer_event); |
8201af08 | 49 | |
fdfccdbc ZJS |
50 | free(source); |
51 | } | |
52 | ||
9ff48d09 ZJS |
53 | /** |
54 | * Initialize zero-filled source with given values. On success, takes | |
55 | * ownerhship of fd and writer, otherwise does not touch them. | |
56 | */ | |
57 | RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) { | |
58 | ||
59 | RemoteSource *source; | |
60 | ||
61 | log_debug("Creating source for %sfd:%d (%s)", | |
62 | passive_fd ? "passive " : "", fd, name); | |
63 | ||
64 | assert(fd >= 0); | |
65 | ||
66 | source = new0(RemoteSource, 1); | |
67 | if (!source) | |
68 | return NULL; | |
69 | ||
70 | source->fd = fd; | |
71 | source->passive_fd = passive_fd; | |
72 | source->name = name; | |
73 | source->writer = writer; | |
74 | ||
75 | return source; | |
76 | } | |
77 | ||
92b10cbc ZJS |
78 | static char* realloc_buffer(RemoteSource *source, size_t size) { |
79 | char *b, *old = source->buf; | |
80 | ||
81 | b = GREEDY_REALLOC(source->buf, source->size, size); | |
82 | if (!b) | |
83 | return NULL; | |
84 | ||
85 | iovw_rebase(&source->iovw, old, source->buf); | |
86 | ||
87 | return b; | |
88 | } | |
89 | ||
fdfccdbc | 90 | static int get_line(RemoteSource *source, char **line, size_t *size) { |
92b10cbc | 91 | ssize_t n; |
60921179 | 92 | char *c = NULL; |
fdfccdbc ZJS |
93 | |
94 | assert(source); | |
95 | assert(source->state == STATE_LINE); | |
92b10cbc | 96 | assert(source->offset <= source->filled); |
fdfccdbc ZJS |
97 | assert(source->filled <= source->size); |
98 | assert(source->buf == NULL || source->size > 0); | |
9ff48d09 | 99 | assert(source->fd >= 0); |
fdfccdbc | 100 | |
57255510 | 101 | for (;;) { |
92b10cbc ZJS |
102 | if (source->buf) { |
103 | size_t start = MAX(source->scanned, source->offset); | |
104 | ||
105 | c = memchr(source->buf + start, '\n', | |
106 | source->filled - start); | |
107 | if (c != NULL) | |
108 | break; | |
109 | } | |
851d4e2a ZJS |
110 | |
111 | source->scanned = source->filled; | |
112 | if (source->scanned >= DATA_SIZE_MAX) { | |
113 | log_error("Entry is bigger than %u bytes.", DATA_SIZE_MAX); | |
114 | return -E2BIG; | |
115 | } | |
60921179 | 116 | |
9ff48d09 | 117 | if (source->passive_fd) |
851d4e2a | 118 | /* we have to wait for some data to come to us */ |
ff55c3c7 | 119 | return -EAGAIN; |
fdfccdbc | 120 | |
39d0fd9c ZJS |
121 | /* We know that source->filled is at most DATA_SIZE_MAX, so if |
122 | we reallocate it, we'll increase the size at least a bit. */ | |
123 | assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX); | |
851d4e2a | 124 | if (source->size - source->filled < LINE_CHUNK && |
39d0fd9c | 125 | !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX))) |
851d4e2a | 126 | return log_oom(); |
9786767a | 127 | |
39d0fd9c | 128 | assert(source->buf); |
4a0a6ac0 | 129 | assert(source->size - source->filled >= LINE_CHUNK || |
92b10cbc | 130 | source->size == ENTRY_SIZE_MAX); |
fdfccdbc | 131 | |
39d0fd9c ZJS |
132 | n = read(source->fd, |
133 | source->buf + source->filled, | |
4a0a6ac0 | 134 | source->size - source->filled); |
851d4e2a | 135 | if (n < 0) { |
ff55c3c7 | 136 | if (errno != EAGAIN) |
39d0fd9c ZJS |
137 | log_error_errno(errno, "read(%d, ..., %zu): %m", |
138 | source->fd, | |
1fa2f38f | 139 | source->size - source->filled); |
851d4e2a ZJS |
140 | return -errno; |
141 | } else if (n == 0) | |
142 | return 0; | |
fdfccdbc | 143 | |
851d4e2a ZJS |
144 | source->filled += n; |
145 | } | |
fdfccdbc | 146 | |
92b10cbc ZJS |
147 | *line = source->buf + source->offset; |
148 | *size = c + 1 - source->buf - source->offset; | |
149 | source->offset += *size; | |
fdfccdbc ZJS |
150 | |
151 | return 1; | |
152 | } | |
153 | ||
cc64d017 ZJS |
154 | int push_data(RemoteSource *source, const char *data, size_t size) { |
155 | assert(source); | |
156 | assert(source->state != STATE_EOF); | |
157 | ||
92b10cbc | 158 | if (!realloc_buffer(source, source->filled + size)) { |
851d4e2a ZJS |
159 | log_error("Failed to store received data of size %zu " |
160 | "(in addition to existing %zu bytes with %zu filled): %s", | |
161 | size, source->size, source->filled, strerror(ENOMEM)); | |
162 | return -ENOMEM; | |
163 | } | |
cc64d017 ZJS |
164 | |
165 | memcpy(source->buf + source->filled, data, size); | |
166 | source->filled += size; | |
167 | ||
168 | return 0; | |
169 | } | |
170 | ||
fdfccdbc | 171 | static int fill_fixed_size(RemoteSource *source, void **data, size_t size) { |
fdfccdbc ZJS |
172 | |
173 | assert(source); | |
174 | assert(source->state == STATE_DATA_START || | |
175 | source->state == STATE_DATA || | |
176 | source->state == STATE_DATA_FINISH); | |
177 | assert(size <= DATA_SIZE_MAX); | |
92b10cbc | 178 | assert(source->offset <= source->filled); |
fdfccdbc ZJS |
179 | assert(source->filled <= source->size); |
180 | assert(source->buf != NULL || source->size == 0); | |
181 | assert(source->buf == NULL || source->size > 0); | |
9ff48d09 | 182 | assert(source->fd >= 0); |
fdfccdbc ZJS |
183 | assert(data); |
184 | ||
92b10cbc ZJS |
185 | while (source->filled - source->offset < size) { |
186 | int n; | |
187 | ||
9ff48d09 | 188 | if (source->passive_fd) |
9786767a | 189 | /* we have to wait for some data to come to us */ |
ff55c3c7 | 190 | return -EAGAIN; |
9786767a | 191 | |
92b10cbc | 192 | if (!realloc_buffer(source, source->offset + size)) |
fdfccdbc ZJS |
193 | return log_oom(); |
194 | ||
195 | n = read(source->fd, source->buf + source->filled, | |
196 | source->size - source->filled); | |
197 | if (n < 0) { | |
ff55c3c7 | 198 | if (errno != EAGAIN) |
1fa2f38f ZJS |
199 | log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd, |
200 | source->size - source->filled); | |
fdfccdbc ZJS |
201 | return -errno; |
202 | } else if (n == 0) | |
203 | return 0; | |
204 | ||
205 | source->filled += n; | |
206 | } | |
207 | ||
92b10cbc ZJS |
208 | *data = source->buf + source->offset; |
209 | source->offset += size; | |
fdfccdbc ZJS |
210 | |
211 | return 1; | |
212 | } | |
213 | ||
214 | static int get_data_size(RemoteSource *source) { | |
215 | int r; | |
92b10cbc | 216 | void *data; |
fdfccdbc ZJS |
217 | |
218 | assert(source); | |
219 | assert(source->state == STATE_DATA_START); | |
220 | assert(source->data_size == 0); | |
221 | ||
222 | r = fill_fixed_size(source, &data, sizeof(uint64_t)); | |
223 | if (r <= 0) | |
224 | return r; | |
225 | ||
226 | source->data_size = le64toh( *(uint64_t *) data ); | |
227 | if (source->data_size > DATA_SIZE_MAX) { | |
a83f4037 | 228 | log_error("Stream declares field with size %zu > DATA_SIZE_MAX = %u", |
fdfccdbc ZJS |
229 | source->data_size, DATA_SIZE_MAX); |
230 | return -EINVAL; | |
231 | } | |
232 | if (source->data_size == 0) | |
233 | log_warning("Binary field with zero length"); | |
234 | ||
235 | return 1; | |
236 | } | |
237 | ||
238 | static int get_data_data(RemoteSource *source, void **data) { | |
239 | int r; | |
240 | ||
241 | assert(source); | |
242 | assert(data); | |
243 | assert(source->state == STATE_DATA); | |
244 | ||
245 | r = fill_fixed_size(source, data, source->data_size); | |
246 | if (r <= 0) | |
247 | return r; | |
248 | ||
249 | return 1; | |
250 | } | |
251 | ||
252 | static int get_data_newline(RemoteSource *source) { | |
253 | int r; | |
92b10cbc | 254 | char *data; |
fdfccdbc ZJS |
255 | |
256 | assert(source); | |
257 | assert(source->state == STATE_DATA_FINISH); | |
258 | ||
259 | r = fill_fixed_size(source, (void**) &data, 1); | |
260 | if (r <= 0) | |
261 | return r; | |
262 | ||
263 | assert(data); | |
264 | if (*data != '\n') { | |
265 | log_error("expected newline, got '%c'", *data); | |
266 | return -EINVAL; | |
267 | } | |
268 | ||
269 | return 1; | |
270 | } | |
271 | ||
272 | static int process_dunder(RemoteSource *source, char *line, size_t n) { | |
273 | const char *timestamp; | |
274 | int r; | |
275 | ||
276 | assert(line); | |
277 | assert(n > 0); | |
278 | assert(line[n-1] == '\n'); | |
279 | ||
280 | /* XXX: is it worth to support timestamps in extended format? | |
281 | * We don't produce them, but who knows... */ | |
282 | ||
283 | timestamp = startswith(line, "__CURSOR="); | |
284 | if (timestamp) | |
285 | /* ignore __CURSOR */ | |
286 | return 1; | |
287 | ||
288 | timestamp = startswith(line, "__REALTIME_TIMESTAMP="); | |
289 | if (timestamp) { | |
290 | long long unsigned x; | |
291 | line[n-1] = '\0'; | |
292 | r = safe_atollu(timestamp, &x); | |
293 | if (r < 0) | |
294 | log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp); | |
295 | else | |
296 | source->ts.realtime = x; | |
297 | return r < 0 ? r : 1; | |
298 | } | |
299 | ||
300 | timestamp = startswith(line, "__MONOTONIC_TIMESTAMP="); | |
301 | if (timestamp) { | |
302 | long long unsigned x; | |
303 | line[n-1] = '\0'; | |
304 | r = safe_atollu(timestamp, &x); | |
305 | if (r < 0) | |
306 | log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp); | |
307 | else | |
308 | source->ts.monotonic = x; | |
309 | return r < 0 ? r : 1; | |
310 | } | |
311 | ||
312 | timestamp = startswith(line, "__"); | |
313 | if (timestamp) { | |
314 | log_notice("Unknown dunder line %s", line); | |
315 | return 1; | |
316 | } | |
317 | ||
318 | /* no dunder */ | |
319 | return 0; | |
320 | } | |
321 | ||
be7e1319 | 322 | static int process_data(RemoteSource *source) { |
fdfccdbc ZJS |
323 | int r; |
324 | ||
325 | switch(source->state) { | |
326 | case STATE_LINE: { | |
327 | char *line, *sep; | |
a7f7d1bd | 328 | size_t n = 0; |
fdfccdbc ZJS |
329 | |
330 | assert(source->data_size == 0); | |
331 | ||
332 | r = get_line(source, &line, &n); | |
333 | if (r < 0) | |
334 | return r; | |
335 | if (r == 0) { | |
336 | source->state = STATE_EOF; | |
337 | return r; | |
338 | } | |
339 | assert(n > 0); | |
340 | assert(line[n-1] == '\n'); | |
341 | ||
342 | if (n == 1) { | |
cb41ff29 | 343 | log_trace("Received empty line, event is ready"); |
fdfccdbc ZJS |
344 | return 1; |
345 | } | |
346 | ||
347 | r = process_dunder(source, line, n); | |
92b10cbc | 348 | if (r != 0) |
fdfccdbc | 349 | return r < 0 ? r : 0; |
fdfccdbc ZJS |
350 | |
351 | /* MESSAGE=xxx\n | |
352 | or | |
353 | COREDUMP\n | |
354 | LLLLLLLL0011223344...\n | |
355 | */ | |
356 | sep = memchr(line, '=', n); | |
09d801a8 | 357 | if (sep) { |
fdfccdbc ZJS |
358 | /* chomp newline */ |
359 | n--; | |
09d801a8 ZJS |
360 | |
361 | r = iovw_put(&source->iovw, line, n); | |
362 | if (r < 0) | |
363 | return r; | |
364 | } else { | |
fdfccdbc ZJS |
365 | /* replace \n with = */ |
366 | line[n-1] = '='; | |
fdfccdbc | 367 | |
09d801a8 ZJS |
368 | source->field_len = n; |
369 | source->state = STATE_DATA_START; | |
370 | ||
371 | /* we cannot put the field in iovec until we have all data */ | |
fdfccdbc ZJS |
372 | } |
373 | ||
09d801a8 ZJS |
374 | log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary"); |
375 | ||
fdfccdbc ZJS |
376 | return 0; /* continue */ |
377 | } | |
378 | ||
379 | case STATE_DATA_START: | |
380 | assert(source->data_size == 0); | |
381 | ||
382 | r = get_data_size(source); | |
dd87b184 | 383 | // log_debug("get_data_size() -> %d", r); |
fdfccdbc ZJS |
384 | if (r < 0) |
385 | return r; | |
386 | if (r == 0) { | |
387 | source->state = STATE_EOF; | |
388 | return 0; | |
389 | } | |
390 | ||
391 | source->state = source->data_size > 0 ? | |
392 | STATE_DATA : STATE_DATA_FINISH; | |
393 | ||
394 | return 0; /* continue */ | |
395 | ||
396 | case STATE_DATA: { | |
397 | void *data; | |
09d801a8 | 398 | char *field; |
fdfccdbc ZJS |
399 | |
400 | assert(source->data_size > 0); | |
401 | ||
402 | r = get_data_data(source, &data); | |
dd87b184 | 403 | // log_debug("get_data_data() -> %d", r); |
fdfccdbc ZJS |
404 | if (r < 0) |
405 | return r; | |
406 | if (r == 0) { | |
407 | source->state = STATE_EOF; | |
408 | return 0; | |
409 | } | |
410 | ||
411 | assert(data); | |
412 | ||
09d801a8 ZJS |
413 | field = (char*) data - sizeof(uint64_t) - source->field_len; |
414 | memmove(field + sizeof(uint64_t), field, source->field_len); | |
415 | ||
416 | r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size); | |
417 | if (r < 0) | |
fdfccdbc | 418 | return r; |
fdfccdbc ZJS |
419 | |
420 | source->state = STATE_DATA_FINISH; | |
421 | ||
422 | return 0; /* continue */ | |
423 | } | |
424 | ||
425 | case STATE_DATA_FINISH: | |
426 | r = get_data_newline(source); | |
dd87b184 | 427 | // log_debug("get_data_newline() -> %d", r); |
fdfccdbc ZJS |
428 | if (r < 0) |
429 | return r; | |
430 | if (r == 0) { | |
431 | source->state = STATE_EOF; | |
432 | return 0; | |
433 | } | |
434 | ||
435 | source->data_size = 0; | |
436 | source->state = STATE_LINE; | |
437 | ||
438 | return 0; /* continue */ | |
439 | default: | |
440 | assert_not_reached("wtf?"); | |
441 | } | |
442 | } | |
443 | ||
9ff48d09 | 444 | int process_source(RemoteSource *source, bool compress, bool seal) { |
92b10cbc | 445 | size_t remain, target; |
fdfccdbc ZJS |
446 | int r; |
447 | ||
448 | assert(source); | |
9ff48d09 | 449 | assert(source->writer); |
fdfccdbc ZJS |
450 | |
451 | r = process_data(source); | |
452 | if (r <= 0) | |
453 | return r; | |
454 | ||
455 | /* We have a full event */ | |
0e72da6f | 456 | log_trace("Received full event from source@%p fd:%d (%s)", |
a83f4037 | 457 | source, source->fd, source->name); |
fdfccdbc ZJS |
458 | |
459 | if (!source->iovw.count) { | |
460 | log_warning("Entry with no payload, skipping"); | |
461 | goto freeing; | |
462 | } | |
463 | ||
464 | assert(source->iovw.iovec); | |
465 | assert(source->iovw.count); | |
466 | ||
9ff48d09 | 467 | r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal); |
fdfccdbc | 468 | if (r < 0) |
c33b3297 MS |
469 | log_error_errno(r, "Failed to write entry of %zu bytes: %m", |
470 | iovw_size(&source->iovw)); | |
fdfccdbc ZJS |
471 | else |
472 | r = 1; | |
473 | ||
474 | freeing: | |
475 | iovw_free_contents(&source->iovw); | |
92b10cbc ZJS |
476 | |
477 | /* possibly reset buffer position */ | |
478 | remain = source->filled - source->offset; | |
479 | ||
480 | if (remain == 0) /* no brainer */ | |
481 | source->offset = source->scanned = source->filled = 0; | |
482 | else if (source->offset > source->size - source->filled && | |
483 | source->offset > remain) { | |
484 | memcpy(source->buf, source->buf + source->offset, remain); | |
485 | source->offset = source->scanned = 0; | |
486 | source->filled = remain; | |
487 | } | |
488 | ||
489 | target = source->size; | |
490 | while (target > 16 * LINE_CHUNK && remain < target / 2) | |
491 | target /= 2; | |
492 | if (target < source->size) { | |
493 | char *tmp; | |
494 | ||
495 | tmp = realloc(source->buf, target); | |
e4c38cc3 | 496 | if (!tmp) |
92b10cbc ZJS |
497 | log_warning("Failed to reallocate buffer to (smaller) size %zu", |
498 | target); | |
499 | else { | |
500 | log_debug("Reallocated buffer from %zu to %zu bytes", | |
501 | source->size, target); | |
502 | source->buf = tmp; | |
503 | source->size = target; | |
504 | } | |
505 | } | |
506 | ||
fdfccdbc ZJS |
507 | return r; |
508 | } |