]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-remote-parse.c
journal-remote: small fixes
[thirdparty/systemd.git] / src / journal-remote / journal-remote-parse.c
CommitLineData
fdfccdbc
ZJS
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include "journal-remote-parse.h"
23#include "journald-native.h"
24
25#define LINE_CHUNK 1024u
26
27void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
31 if (source->fd >= 0) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 close(source->fd);
34 }
35 free(source->name);
36 free(source->buf);
37 iovw_free_contents(&source->iovw);
38 free(source);
39}
40
41static int get_line(RemoteSource *source, char **line, size_t *size) {
42 ssize_t n, remain;
60921179 43 char *c = NULL;
fdfccdbc
ZJS
44 char *newbuf = NULL;
45 size_t newsize = 0;
46
47 assert(source);
48 assert(source->state == STATE_LINE);
49 assert(source->filled <= source->size);
50 assert(source->buf == NULL || source->size > 0);
51
60921179
TA
52 if (source->buf)
53 c = memchr(source->buf, '\n', source->filled);
54
fdfccdbc
ZJS
55 if (c != NULL)
56 goto docopy;
57
58 resize:
9786767a
ZJS
59 if (source->fd < 0)
60 /* we have to wait for some data to come to us */
61 return -EWOULDBLOCK;
62
fdfccdbc
ZJS
63 if (source->size - source->filled < LINE_CHUNK) {
64 // XXX: add check for maximum line length
65
66 if (!GREEDY_REALLOC(source->buf, source->size,
67 source->filled + LINE_CHUNK))
68 return log_oom();
69 }
70 assert(source->size - source->filled >= LINE_CHUNK);
71
72 n = read(source->fd, source->buf + source->filled,
73 source->size - source->filled);
74 if (n < 0) {
75 if (errno != EAGAIN && errno != EWOULDBLOCK)
76 log_error("read(%d, ..., %zd): %m", source->fd,
77 source->size - source->filled);
78 return -errno;
79 } else if (n == 0)
80 return 0;
81
82 c = memchr(source->buf + source->filled, '\n', n);
83 source->filled += n;
84
85 if (c == NULL)
86 goto resize;
87
88 docopy:
89 *line = source->buf;
90 *size = c + 1 - source->buf;
91
92 /* Check if something remains */
93 remain = source->buf + source->filled - c - 1;
94 assert(remain >= 0);
95 if (remain) {
96 newsize = MAX(remain, LINE_CHUNK);
97 newbuf = malloc(newsize);
98 if (!newbuf)
99 return log_oom();
100 memcpy(newbuf, c + 1, remain);
101 }
102 source->buf = newbuf;
103 source->size = newsize;
104 source->filled = remain;
105
106 return 1;
107}
108
cc64d017
ZJS
109int push_data(RemoteSource *source, const char *data, size_t size) {
110 assert(source);
111 assert(source->state != STATE_EOF);
112
113 if (!GREEDY_REALLOC(source->buf, source->size,
114 source->filled + size))
115 return log_oom();
116
117 memcpy(source->buf + source->filled, data, size);
118 source->filled += size;
119
120 return 0;
121}
122
fdfccdbc
ZJS
123static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
124 int n;
125 char *newbuf = NULL;
126 size_t newsize = 0, remain;
127
128 assert(source);
129 assert(source->state == STATE_DATA_START ||
130 source->state == STATE_DATA ||
131 source->state == STATE_DATA_FINISH);
132 assert(size <= DATA_SIZE_MAX);
133 assert(source->filled <= source->size);
134 assert(source->buf != NULL || source->size == 0);
135 assert(source->buf == NULL || source->size > 0);
136 assert(data);
137
138 while(source->filled < size) {
9786767a
ZJS
139 if (source->fd < 0)
140 /* we have to wait for some data to come to us */
141 return -EWOULDBLOCK;
142
fdfccdbc
ZJS
143 if (!GREEDY_REALLOC(source->buf, source->size, size))
144 return log_oom();
145
146 n = read(source->fd, source->buf + source->filled,
147 source->size - source->filled);
148 if (n < 0) {
149 if (errno != EAGAIN && errno != EWOULDBLOCK)
150 log_error("read(%d, ..., %zd): %m", source->fd,
151 source->size - source->filled);
152 return -errno;
153 } else if (n == 0)
154 return 0;
155
156 source->filled += n;
157 }
158
159 *data = source->buf;
160
161 /* Check if something remains */
162 assert(size <= source->filled);
163 remain = source->filled - size;
164 if (remain) {
165 newsize = MAX(remain, LINE_CHUNK);
166 newbuf = malloc(newsize);
167 if (!newbuf)
168 return log_oom();
169 memcpy(newbuf, source->buf + size, remain);
170 }
171 source->buf = newbuf;
172 source->size = newsize;
173 source->filled = remain;
174
175 return 1;
176}
177
178static int get_data_size(RemoteSource *source) {
179 int r;
c8b32e11 180 _cleanup_free_ void *data = NULL;
fdfccdbc
ZJS
181
182 assert(source);
183 assert(source->state == STATE_DATA_START);
184 assert(source->data_size == 0);
185
186 r = fill_fixed_size(source, &data, sizeof(uint64_t));
187 if (r <= 0)
188 return r;
189
190 source->data_size = le64toh( *(uint64_t *) data );
191 if (source->data_size > DATA_SIZE_MAX) {
192 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
193 source->data_size, DATA_SIZE_MAX);
194 return -EINVAL;
195 }
196 if (source->data_size == 0)
197 log_warning("Binary field with zero length");
198
199 return 1;
200}
201
202static int get_data_data(RemoteSource *source, void **data) {
203 int r;
204
205 assert(source);
206 assert(data);
207 assert(source->state == STATE_DATA);
208
209 r = fill_fixed_size(source, data, source->data_size);
210 if (r <= 0)
211 return r;
212
213 return 1;
214}
215
216static int get_data_newline(RemoteSource *source) {
217 int r;
c8b32e11 218 _cleanup_free_ char *data = NULL;
fdfccdbc
ZJS
219
220 assert(source);
221 assert(source->state == STATE_DATA_FINISH);
222
223 r = fill_fixed_size(source, (void**) &data, 1);
224 if (r <= 0)
225 return r;
226
227 assert(data);
228 if (*data != '\n') {
229 log_error("expected newline, got '%c'", *data);
230 return -EINVAL;
231 }
232
233 return 1;
234}
235
236static int process_dunder(RemoteSource *source, char *line, size_t n) {
237 const char *timestamp;
238 int r;
239
240 assert(line);
241 assert(n > 0);
242 assert(line[n-1] == '\n');
243
244 /* XXX: is it worth to support timestamps in extended format?
245 * We don't produce them, but who knows... */
246
247 timestamp = startswith(line, "__CURSOR=");
248 if (timestamp)
249 /* ignore __CURSOR */
250 return 1;
251
252 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
253 if (timestamp) {
254 long long unsigned x;
255 line[n-1] = '\0';
256 r = safe_atollu(timestamp, &x);
257 if (r < 0)
258 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
259 else
260 source->ts.realtime = x;
261 return r < 0 ? r : 1;
262 }
263
264 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
265 if (timestamp) {
266 long long unsigned x;
267 line[n-1] = '\0';
268 r = safe_atollu(timestamp, &x);
269 if (r < 0)
270 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
271 else
272 source->ts.monotonic = x;
273 return r < 0 ? r : 1;
274 }
275
276 timestamp = startswith(line, "__");
277 if (timestamp) {
278 log_notice("Unknown dunder line %s", line);
279 return 1;
280 }
281
282 /* no dunder */
283 return 0;
284}
285
286int process_data(RemoteSource *source) {
287 int r;
288
289 switch(source->state) {
290 case STATE_LINE: {
291 char *line, *sep;
292 size_t n;
293
294 assert(source->data_size == 0);
295
296 r = get_line(source, &line, &n);
297 if (r < 0)
298 return r;
299 if (r == 0) {
300 source->state = STATE_EOF;
301 return r;
302 }
303 assert(n > 0);
304 assert(line[n-1] == '\n');
305
306 if (n == 1) {
307 log_debug("Received empty line, event is ready");
308 free(line);
309 return 1;
310 }
311
312 r = process_dunder(source, line, n);
313 if (r != 0) {
314 free(line);
315 return r < 0 ? r : 0;
316 }
317
318 /* MESSAGE=xxx\n
319 or
320 COREDUMP\n
321 LLLLLLLL0011223344...\n
322 */
323 sep = memchr(line, '=', n);
324 if (sep)
325 /* chomp newline */
326 n--;
327 else
328 /* replace \n with = */
329 line[n-1] = '=';
330 log_debug("Received: %.*s", (int) n, line);
331
332 r = iovw_put(&source->iovw, line, n);
333 if (r < 0) {
334 log_error("Failed to put line in iovect");
335 free(line);
336 return r;
337 }
338
339 if (!sep)
340 source->state = STATE_DATA_START;
341 return 0; /* continue */
342 }
343
344 case STATE_DATA_START:
345 assert(source->data_size == 0);
346
347 r = get_data_size(source);
348 log_debug("get_data_size() -> %d", r);
349 if (r < 0)
350 return r;
351 if (r == 0) {
352 source->state = STATE_EOF;
353 return 0;
354 }
355
356 source->state = source->data_size > 0 ?
357 STATE_DATA : STATE_DATA_FINISH;
358
359 return 0; /* continue */
360
361 case STATE_DATA: {
362 void *data;
363
364 assert(source->data_size > 0);
365
366 r = get_data_data(source, &data);
367 log_debug("get_data_data() -> %d", r);
368 if (r < 0)
369 return r;
370 if (r == 0) {
371 source->state = STATE_EOF;
372 return 0;
373 }
374
375 assert(data);
376
377 r = iovw_put(&source->iovw, data, source->data_size);
378 if (r < 0) {
379 log_error("failed to put binary buffer in iovect");
380 return r;
381 }
382
383 source->state = STATE_DATA_FINISH;
384
385 return 0; /* continue */
386 }
387
388 case STATE_DATA_FINISH:
389 r = get_data_newline(source);
390 log_debug("get_data_newline() -> %d", r);
391 if (r < 0)
392 return r;
393 if (r == 0) {
394 source->state = STATE_EOF;
395 return 0;
396 }
397
398 source->data_size = 0;
399 source->state = STATE_LINE;
400
401 return 0; /* continue */
402 default:
403 assert_not_reached("wtf?");
404 }
405}
406
407int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
408 int r;
409
410 assert(source);
411 assert(writer);
412
413 r = process_data(source);
414 if (r <= 0)
415 return r;
416
417 /* We have a full event */
418 log_info("Received a full event from source@%p fd:%d (%s)",
419 source, source->fd, source->name);
420
421 if (!source->iovw.count) {
422 log_warning("Entry with no payload, skipping");
423 goto freeing;
424 }
425
426 assert(source->iovw.iovec);
427 assert(source->iovw.count);
428
429 r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
430 if (r < 0)
431 log_error("Failed to write entry of %zu bytes: %s",
432 iovw_size(&source->iovw), strerror(-r));
433 else
434 r = 1;
435
436 freeing:
437 iovw_free_contents(&source->iovw);
438 return r;
439}