]>
Commit | Line | Data |
---|---|---|
fdfccdbc ZJS |
1 | /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ |
2 | ||
3 | /*** | |
4 | This file is part of systemd. | |
5 | ||
6 | Copyright 2014 Zbigniew Jędrzejewski-Szmek | |
7 | ||
8 | systemd is free software; you can redistribute it and/or modify it | |
9 | under the terms of the GNU Lesser General Public License as published by | |
10 | the Free Software Foundation; either version 2.1 of the License, or | |
11 | (at your option) any later version. | |
12 | ||
13 | systemd is distributed in the hope that it will be useful, but | |
14 | WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | Lesser General Public License for more details. | |
17 | ||
18 | You should have received a copy of the GNU Lesser General Public License | |
19 | along with systemd; If not, see <http://www.gnu.org/licenses/>. | |
20 | ***/ | |
21 | ||
22 | #include "journal-remote-parse.h" | |
23 | #include "journald-native.h" | |
24 | ||
25 | #define LINE_CHUNK 1024u | |
26 | ||
27 | void source_free(RemoteSource *source) { | |
28 | if (!source) | |
29 | return; | |
30 | ||
31 | if (source->fd >= 0) { | |
32 | log_debug("Closing fd:%d (%s)", source->fd, source->name); | |
33 | close(source->fd); | |
34 | } | |
35 | free(source->name); | |
36 | free(source->buf); | |
37 | iovw_free_contents(&source->iovw); | |
38 | free(source); | |
39 | } | |
40 | ||
41 | static int get_line(RemoteSource *source, char **line, size_t *size) { | |
42 | ssize_t n, remain; | |
60921179 | 43 | char *c = NULL; |
fdfccdbc ZJS |
44 | char *newbuf = NULL; |
45 | size_t newsize = 0; | |
46 | ||
47 | assert(source); | |
48 | assert(source->state == STATE_LINE); | |
49 | assert(source->filled <= source->size); | |
50 | assert(source->buf == NULL || source->size > 0); | |
51 | ||
60921179 TA |
52 | if (source->buf) |
53 | c = memchr(source->buf, '\n', source->filled); | |
54 | ||
fdfccdbc ZJS |
55 | if (c != NULL) |
56 | goto docopy; | |
57 | ||
58 | resize: | |
9786767a ZJS |
59 | if (source->fd < 0) |
60 | /* we have to wait for some data to come to us */ | |
61 | return -EWOULDBLOCK; | |
62 | ||
fdfccdbc ZJS |
63 | if (source->size - source->filled < LINE_CHUNK) { |
64 | // XXX: add check for maximum line length | |
65 | ||
66 | if (!GREEDY_REALLOC(source->buf, source->size, | |
67 | source->filled + LINE_CHUNK)) | |
68 | return log_oom(); | |
69 | } | |
70 | assert(source->size - source->filled >= LINE_CHUNK); | |
71 | ||
72 | n = read(source->fd, source->buf + source->filled, | |
73 | source->size - source->filled); | |
74 | if (n < 0) { | |
75 | if (errno != EAGAIN && errno != EWOULDBLOCK) | |
76 | log_error("read(%d, ..., %zd): %m", source->fd, | |
77 | source->size - source->filled); | |
78 | return -errno; | |
79 | } else if (n == 0) | |
80 | return 0; | |
81 | ||
82 | c = memchr(source->buf + source->filled, '\n', n); | |
83 | source->filled += n; | |
84 | ||
85 | if (c == NULL) | |
86 | goto resize; | |
87 | ||
88 | docopy: | |
89 | *line = source->buf; | |
90 | *size = c + 1 - source->buf; | |
91 | ||
92 | /* Check if something remains */ | |
93 | remain = source->buf + source->filled - c - 1; | |
94 | assert(remain >= 0); | |
95 | if (remain) { | |
96 | newsize = MAX(remain, LINE_CHUNK); | |
97 | newbuf = malloc(newsize); | |
98 | if (!newbuf) | |
99 | return log_oom(); | |
100 | memcpy(newbuf, c + 1, remain); | |
101 | } | |
102 | source->buf = newbuf; | |
103 | source->size = newsize; | |
104 | source->filled = remain; | |
105 | ||
106 | return 1; | |
107 | } | |
108 | ||
cc64d017 ZJS |
109 | int push_data(RemoteSource *source, const char *data, size_t size) { |
110 | assert(source); | |
111 | assert(source->state != STATE_EOF); | |
112 | ||
113 | if (!GREEDY_REALLOC(source->buf, source->size, | |
114 | source->filled + size)) | |
115 | return log_oom(); | |
116 | ||
117 | memcpy(source->buf + source->filled, data, size); | |
118 | source->filled += size; | |
119 | ||
120 | return 0; | |
121 | } | |
122 | ||
fdfccdbc ZJS |
123 | static int fill_fixed_size(RemoteSource *source, void **data, size_t size) { |
124 | int n; | |
125 | char *newbuf = NULL; | |
126 | size_t newsize = 0, remain; | |
127 | ||
128 | assert(source); | |
129 | assert(source->state == STATE_DATA_START || | |
130 | source->state == STATE_DATA || | |
131 | source->state == STATE_DATA_FINISH); | |
132 | assert(size <= DATA_SIZE_MAX); | |
133 | assert(source->filled <= source->size); | |
134 | assert(source->buf != NULL || source->size == 0); | |
135 | assert(source->buf == NULL || source->size > 0); | |
136 | assert(data); | |
137 | ||
138 | while(source->filled < size) { | |
9786767a ZJS |
139 | if (source->fd < 0) |
140 | /* we have to wait for some data to come to us */ | |
141 | return -EWOULDBLOCK; | |
142 | ||
fdfccdbc ZJS |
143 | if (!GREEDY_REALLOC(source->buf, source->size, size)) |
144 | return log_oom(); | |
145 | ||
146 | n = read(source->fd, source->buf + source->filled, | |
147 | source->size - source->filled); | |
148 | if (n < 0) { | |
149 | if (errno != EAGAIN && errno != EWOULDBLOCK) | |
150 | log_error("read(%d, ..., %zd): %m", source->fd, | |
151 | source->size - source->filled); | |
152 | return -errno; | |
153 | } else if (n == 0) | |
154 | return 0; | |
155 | ||
156 | source->filled += n; | |
157 | } | |
158 | ||
159 | *data = source->buf; | |
160 | ||
161 | /* Check if something remains */ | |
162 | assert(size <= source->filled); | |
163 | remain = source->filled - size; | |
164 | if (remain) { | |
165 | newsize = MAX(remain, LINE_CHUNK); | |
166 | newbuf = malloc(newsize); | |
167 | if (!newbuf) | |
168 | return log_oom(); | |
169 | memcpy(newbuf, source->buf + size, remain); | |
170 | } | |
171 | source->buf = newbuf; | |
172 | source->size = newsize; | |
173 | source->filled = remain; | |
174 | ||
175 | return 1; | |
176 | } | |
177 | ||
178 | static int get_data_size(RemoteSource *source) { | |
179 | int r; | |
c8b32e11 | 180 | _cleanup_free_ void *data = NULL; |
fdfccdbc ZJS |
181 | |
182 | assert(source); | |
183 | assert(source->state == STATE_DATA_START); | |
184 | assert(source->data_size == 0); | |
185 | ||
186 | r = fill_fixed_size(source, &data, sizeof(uint64_t)); | |
187 | if (r <= 0) | |
188 | return r; | |
189 | ||
190 | source->data_size = le64toh( *(uint64_t *) data ); | |
191 | if (source->data_size > DATA_SIZE_MAX) { | |
192 | log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX", | |
193 | source->data_size, DATA_SIZE_MAX); | |
194 | return -EINVAL; | |
195 | } | |
196 | if (source->data_size == 0) | |
197 | log_warning("Binary field with zero length"); | |
198 | ||
199 | return 1; | |
200 | } | |
201 | ||
202 | static int get_data_data(RemoteSource *source, void **data) { | |
203 | int r; | |
204 | ||
205 | assert(source); | |
206 | assert(data); | |
207 | assert(source->state == STATE_DATA); | |
208 | ||
209 | r = fill_fixed_size(source, data, source->data_size); | |
210 | if (r <= 0) | |
211 | return r; | |
212 | ||
213 | return 1; | |
214 | } | |
215 | ||
216 | static int get_data_newline(RemoteSource *source) { | |
217 | int r; | |
c8b32e11 | 218 | _cleanup_free_ char *data = NULL; |
fdfccdbc ZJS |
219 | |
220 | assert(source); | |
221 | assert(source->state == STATE_DATA_FINISH); | |
222 | ||
223 | r = fill_fixed_size(source, (void**) &data, 1); | |
224 | if (r <= 0) | |
225 | return r; | |
226 | ||
227 | assert(data); | |
228 | if (*data != '\n') { | |
229 | log_error("expected newline, got '%c'", *data); | |
230 | return -EINVAL; | |
231 | } | |
232 | ||
233 | return 1; | |
234 | } | |
235 | ||
236 | static int process_dunder(RemoteSource *source, char *line, size_t n) { | |
237 | const char *timestamp; | |
238 | int r; | |
239 | ||
240 | assert(line); | |
241 | assert(n > 0); | |
242 | assert(line[n-1] == '\n'); | |
243 | ||
244 | /* XXX: is it worth to support timestamps in extended format? | |
245 | * We don't produce them, but who knows... */ | |
246 | ||
247 | timestamp = startswith(line, "__CURSOR="); | |
248 | if (timestamp) | |
249 | /* ignore __CURSOR */ | |
250 | return 1; | |
251 | ||
252 | timestamp = startswith(line, "__REALTIME_TIMESTAMP="); | |
253 | if (timestamp) { | |
254 | long long unsigned x; | |
255 | line[n-1] = '\0'; | |
256 | r = safe_atollu(timestamp, &x); | |
257 | if (r < 0) | |
258 | log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp); | |
259 | else | |
260 | source->ts.realtime = x; | |
261 | return r < 0 ? r : 1; | |
262 | } | |
263 | ||
264 | timestamp = startswith(line, "__MONOTONIC_TIMESTAMP="); | |
265 | if (timestamp) { | |
266 | long long unsigned x; | |
267 | line[n-1] = '\0'; | |
268 | r = safe_atollu(timestamp, &x); | |
269 | if (r < 0) | |
270 | log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp); | |
271 | else | |
272 | source->ts.monotonic = x; | |
273 | return r < 0 ? r : 1; | |
274 | } | |
275 | ||
276 | timestamp = startswith(line, "__"); | |
277 | if (timestamp) { | |
278 | log_notice("Unknown dunder line %s", line); | |
279 | return 1; | |
280 | } | |
281 | ||
282 | /* no dunder */ | |
283 | return 0; | |
284 | } | |
285 | ||
286 | int process_data(RemoteSource *source) { | |
287 | int r; | |
288 | ||
289 | switch(source->state) { | |
290 | case STATE_LINE: { | |
291 | char *line, *sep; | |
292 | size_t n; | |
293 | ||
294 | assert(source->data_size == 0); | |
295 | ||
296 | r = get_line(source, &line, &n); | |
297 | if (r < 0) | |
298 | return r; | |
299 | if (r == 0) { | |
300 | source->state = STATE_EOF; | |
301 | return r; | |
302 | } | |
303 | assert(n > 0); | |
304 | assert(line[n-1] == '\n'); | |
305 | ||
306 | if (n == 1) { | |
307 | log_debug("Received empty line, event is ready"); | |
308 | free(line); | |
309 | return 1; | |
310 | } | |
311 | ||
312 | r = process_dunder(source, line, n); | |
313 | if (r != 0) { | |
314 | free(line); | |
315 | return r < 0 ? r : 0; | |
316 | } | |
317 | ||
318 | /* MESSAGE=xxx\n | |
319 | or | |
320 | COREDUMP\n | |
321 | LLLLLLLL0011223344...\n | |
322 | */ | |
323 | sep = memchr(line, '=', n); | |
324 | if (sep) | |
325 | /* chomp newline */ | |
326 | n--; | |
327 | else | |
328 | /* replace \n with = */ | |
329 | line[n-1] = '='; | |
330 | log_debug("Received: %.*s", (int) n, line); | |
331 | ||
332 | r = iovw_put(&source->iovw, line, n); | |
333 | if (r < 0) { | |
334 | log_error("Failed to put line in iovect"); | |
335 | free(line); | |
336 | return r; | |
337 | } | |
338 | ||
339 | if (!sep) | |
340 | source->state = STATE_DATA_START; | |
341 | return 0; /* continue */ | |
342 | } | |
343 | ||
344 | case STATE_DATA_START: | |
345 | assert(source->data_size == 0); | |
346 | ||
347 | r = get_data_size(source); | |
348 | log_debug("get_data_size() -> %d", r); | |
349 | if (r < 0) | |
350 | return r; | |
351 | if (r == 0) { | |
352 | source->state = STATE_EOF; | |
353 | return 0; | |
354 | } | |
355 | ||
356 | source->state = source->data_size > 0 ? | |
357 | STATE_DATA : STATE_DATA_FINISH; | |
358 | ||
359 | return 0; /* continue */ | |
360 | ||
361 | case STATE_DATA: { | |
362 | void *data; | |
363 | ||
364 | assert(source->data_size > 0); | |
365 | ||
366 | r = get_data_data(source, &data); | |
367 | log_debug("get_data_data() -> %d", r); | |
368 | if (r < 0) | |
369 | return r; | |
370 | if (r == 0) { | |
371 | source->state = STATE_EOF; | |
372 | return 0; | |
373 | } | |
374 | ||
375 | assert(data); | |
376 | ||
377 | r = iovw_put(&source->iovw, data, source->data_size); | |
378 | if (r < 0) { | |
379 | log_error("failed to put binary buffer in iovect"); | |
380 | return r; | |
381 | } | |
382 | ||
383 | source->state = STATE_DATA_FINISH; | |
384 | ||
385 | return 0; /* continue */ | |
386 | } | |
387 | ||
388 | case STATE_DATA_FINISH: | |
389 | r = get_data_newline(source); | |
390 | log_debug("get_data_newline() -> %d", r); | |
391 | if (r < 0) | |
392 | return r; | |
393 | if (r == 0) { | |
394 | source->state = STATE_EOF; | |
395 | return 0; | |
396 | } | |
397 | ||
398 | source->data_size = 0; | |
399 | source->state = STATE_LINE; | |
400 | ||
401 | return 0; /* continue */ | |
402 | default: | |
403 | assert_not_reached("wtf?"); | |
404 | } | |
405 | } | |
406 | ||
407 | int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) { | |
408 | int r; | |
409 | ||
410 | assert(source); | |
411 | assert(writer); | |
412 | ||
413 | r = process_data(source); | |
414 | if (r <= 0) | |
415 | return r; | |
416 | ||
417 | /* We have a full event */ | |
418 | log_info("Received a full event from source@%p fd:%d (%s)", | |
419 | source, source->fd, source->name); | |
420 | ||
421 | if (!source->iovw.count) { | |
422 | log_warning("Entry with no payload, skipping"); | |
423 | goto freeing; | |
424 | } | |
425 | ||
426 | assert(source->iovw.iovec); | |
427 | assert(source->iovw.count); | |
428 | ||
429 | r = writer_write(writer, &source->iovw, &source->ts, compress, seal); | |
430 | if (r < 0) | |
431 | log_error("Failed to write entry of %zu bytes: %s", | |
432 | iovw_size(&source->iovw), strerror(-r)); | |
433 | else | |
434 | r = 1; | |
435 | ||
436 | freeing: | |
437 | iovw_free_contents(&source->iovw); | |
438 | return r; | |
439 | } |