]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal/journal-remote-parse.c
journal-remote: HTTP(s) support
[thirdparty/systemd.git] / src / journal / journal-remote-parse.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include "journal-remote-parse.h"
23 #include "journald-native.h"
24
25 #define LINE_CHUNK 1024u
26
27 void source_free(RemoteSource *source) {
28 if (!source)
29 return;
30
31 if (source->fd >= 0) {
32 log_debug("Closing fd:%d (%s)", source->fd, source->name);
33 close(source->fd);
34 }
35 free(source->name);
36 free(source->buf);
37 iovw_free_contents(&source->iovw);
38 free(source);
39 }
40
41 static int get_line(RemoteSource *source, char **line, size_t *size) {
42 ssize_t n, remain;
43 char *c;
44 char *newbuf = NULL;
45 size_t newsize = 0;
46
47 assert(source);
48 assert(source->state == STATE_LINE);
49 assert(source->filled <= source->size);
50 assert(source->buf == NULL || source->size > 0);
51
52 c = memchr(source->buf, '\n', source->filled);
53 if (c != NULL)
54 goto docopy;
55
56 resize:
57 if (source->size - source->filled < LINE_CHUNK) {
58 // XXX: add check for maximum line length
59
60 if (!GREEDY_REALLOC(source->buf, source->size,
61 source->filled + LINE_CHUNK))
62 return log_oom();
63 }
64 assert(source->size - source->filled >= LINE_CHUNK);
65
66 n = read(source->fd, source->buf + source->filled,
67 source->size - source->filled);
68 if (n < 0) {
69 if (errno != EAGAIN && errno != EWOULDBLOCK)
70 log_error("read(%d, ..., %zd): %m", source->fd,
71 source->size - source->filled);
72 return -errno;
73 } else if (n == 0)
74 return 0;
75
76 c = memchr(source->buf + source->filled, '\n', n);
77 source->filled += n;
78
79 if (c == NULL)
80 goto resize;
81
82 docopy:
83 *line = source->buf;
84 *size = c + 1 - source->buf;
85
86 /* Check if something remains */
87 remain = source->buf + source->filled - c - 1;
88 assert(remain >= 0);
89 if (remain) {
90 newsize = MAX(remain, LINE_CHUNK);
91 newbuf = malloc(newsize);
92 if (!newbuf)
93 return log_oom();
94 memcpy(newbuf, c + 1, remain);
95 }
96 source->buf = newbuf;
97 source->size = newsize;
98 source->filled = remain;
99
100 return 1;
101 }
102
103 int push_data(RemoteSource *source, const char *data, size_t size) {
104 assert(source);
105 assert(source->state != STATE_EOF);
106
107 if (!GREEDY_REALLOC(source->buf, source->size,
108 source->filled + size))
109 return log_oom();
110
111 memcpy(source->buf + source->filled, data, size);
112 source->filled += size;
113
114 return 0;
115 }
116
117 static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
118 int n;
119 char *newbuf = NULL;
120 size_t newsize = 0, remain;
121
122 assert(source);
123 assert(source->state == STATE_DATA_START ||
124 source->state == STATE_DATA ||
125 source->state == STATE_DATA_FINISH);
126 assert(size <= DATA_SIZE_MAX);
127 assert(source->filled <= source->size);
128 assert(source->buf != NULL || source->size == 0);
129 assert(source->buf == NULL || source->size > 0);
130 assert(data);
131
132 while(source->filled < size) {
133 if (!GREEDY_REALLOC(source->buf, source->size, size))
134 return log_oom();
135
136 n = read(source->fd, source->buf + source->filled,
137 source->size - source->filled);
138 if (n < 0) {
139 if (errno != EAGAIN && errno != EWOULDBLOCK)
140 log_error("read(%d, ..., %zd): %m", source->fd,
141 source->size - source->filled);
142 return -errno;
143 } else if (n == 0)
144 return 0;
145
146 source->filled += n;
147 }
148
149 *data = source->buf;
150
151 /* Check if something remains */
152 assert(size <= source->filled);
153 remain = source->filled - size;
154 if (remain) {
155 newsize = MAX(remain, LINE_CHUNK);
156 newbuf = malloc(newsize);
157 if (!newbuf)
158 return log_oom();
159 memcpy(newbuf, source->buf + size, remain);
160 }
161 source->buf = newbuf;
162 source->size = newsize;
163 source->filled = remain;
164
165 return 1;
166 }
167
168 static int get_data_size(RemoteSource *source) {
169 int r;
170 void _cleanup_free_ *data = NULL;
171
172 assert(source);
173 assert(source->state == STATE_DATA_START);
174 assert(source->data_size == 0);
175
176 r = fill_fixed_size(source, &data, sizeof(uint64_t));
177 if (r <= 0)
178 return r;
179
180 source->data_size = le64toh( *(uint64_t *) data );
181 if (source->data_size > DATA_SIZE_MAX) {
182 log_error("Stream declares field with size %zu > %u == DATA_SIZE_MAX",
183 source->data_size, DATA_SIZE_MAX);
184 return -EINVAL;
185 }
186 if (source->data_size == 0)
187 log_warning("Binary field with zero length");
188
189 return 1;
190 }
191
192 static int get_data_data(RemoteSource *source, void **data) {
193 int r;
194
195 assert(source);
196 assert(data);
197 assert(source->state == STATE_DATA);
198
199 r = fill_fixed_size(source, data, source->data_size);
200 if (r <= 0)
201 return r;
202
203 return 1;
204 }
205
206 static int get_data_newline(RemoteSource *source) {
207 int r;
208 char _cleanup_free_ *data = NULL;
209
210 assert(source);
211 assert(source->state == STATE_DATA_FINISH);
212
213 r = fill_fixed_size(source, (void**) &data, 1);
214 if (r <= 0)
215 return r;
216
217 assert(data);
218 if (*data != '\n') {
219 log_error("expected newline, got '%c'", *data);
220 return -EINVAL;
221 }
222
223 return 1;
224 }
225
226 static int process_dunder(RemoteSource *source, char *line, size_t n) {
227 const char *timestamp;
228 int r;
229
230 assert(line);
231 assert(n > 0);
232 assert(line[n-1] == '\n');
233
234 /* XXX: is it worth to support timestamps in extended format?
235 * We don't produce them, but who knows... */
236
237 timestamp = startswith(line, "__CURSOR=");
238 if (timestamp)
239 /* ignore __CURSOR */
240 return 1;
241
242 timestamp = startswith(line, "__REALTIME_TIMESTAMP=");
243 if (timestamp) {
244 long long unsigned x;
245 line[n-1] = '\0';
246 r = safe_atollu(timestamp, &x);
247 if (r < 0)
248 log_warning("Failed to parse __REALTIME_TIMESTAMP: '%s'", timestamp);
249 else
250 source->ts.realtime = x;
251 return r < 0 ? r : 1;
252 }
253
254 timestamp = startswith(line, "__MONOTONIC_TIMESTAMP=");
255 if (timestamp) {
256 long long unsigned x;
257 line[n-1] = '\0';
258 r = safe_atollu(timestamp, &x);
259 if (r < 0)
260 log_warning("Failed to parse __MONOTONIC_TIMESTAMP: '%s'", timestamp);
261 else
262 source->ts.monotonic = x;
263 return r < 0 ? r : 1;
264 }
265
266 timestamp = startswith(line, "__");
267 if (timestamp) {
268 log_notice("Unknown dunder line %s", line);
269 return 1;
270 }
271
272 /* no dunder */
273 return 0;
274 }
275
276 int process_data(RemoteSource *source) {
277 int r;
278
279 switch(source->state) {
280 case STATE_LINE: {
281 char *line, *sep;
282 size_t n;
283
284 assert(source->data_size == 0);
285
286 r = get_line(source, &line, &n);
287 if (r < 0)
288 return r;
289 if (r == 0) {
290 source->state = STATE_EOF;
291 return r;
292 }
293 assert(n > 0);
294 assert(line[n-1] == '\n');
295
296 if (n == 1) {
297 log_debug("Received empty line, event is ready");
298 free(line);
299 return 1;
300 }
301
302 r = process_dunder(source, line, n);
303 if (r != 0) {
304 free(line);
305 return r < 0 ? r : 0;
306 }
307
308 /* MESSAGE=xxx\n
309 or
310 COREDUMP\n
311 LLLLLLLL0011223344...\n
312 */
313 sep = memchr(line, '=', n);
314 if (sep)
315 /* chomp newline */
316 n--;
317 else
318 /* replace \n with = */
319 line[n-1] = '=';
320 log_debug("Received: %.*s", (int) n, line);
321
322 r = iovw_put(&source->iovw, line, n);
323 if (r < 0) {
324 log_error("Failed to put line in iovect");
325 free(line);
326 return r;
327 }
328
329 if (!sep)
330 source->state = STATE_DATA_START;
331 return 0; /* continue */
332 }
333
334 case STATE_DATA_START:
335 assert(source->data_size == 0);
336
337 r = get_data_size(source);
338 log_debug("get_data_size() -> %d", r);
339 if (r < 0)
340 return r;
341 if (r == 0) {
342 source->state = STATE_EOF;
343 return 0;
344 }
345
346 source->state = source->data_size > 0 ?
347 STATE_DATA : STATE_DATA_FINISH;
348
349 return 0; /* continue */
350
351 case STATE_DATA: {
352 void *data;
353
354 assert(source->data_size > 0);
355
356 r = get_data_data(source, &data);
357 log_debug("get_data_data() -> %d", r);
358 if (r < 0)
359 return r;
360 if (r == 0) {
361 source->state = STATE_EOF;
362 return 0;
363 }
364
365 assert(data);
366
367 r = iovw_put(&source->iovw, data, source->data_size);
368 if (r < 0) {
369 log_error("failed to put binary buffer in iovect");
370 return r;
371 }
372
373 source->state = STATE_DATA_FINISH;
374
375 return 0; /* continue */
376 }
377
378 case STATE_DATA_FINISH:
379 r = get_data_newline(source);
380 log_debug("get_data_newline() -> %d", r);
381 if (r < 0)
382 return r;
383 if (r == 0) {
384 source->state = STATE_EOF;
385 return 0;
386 }
387
388 source->data_size = 0;
389 source->state = STATE_LINE;
390
391 return 0; /* continue */
392 default:
393 assert_not_reached("wtf?");
394 }
395 }
396
397 int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
398 int r;
399
400 assert(source);
401 assert(writer);
402
403 r = process_data(source);
404 if (r <= 0)
405 return r;
406
407 /* We have a full event */
408 log_info("Received a full event from source@%p fd:%d (%s)",
409 source, source->fd, source->name);
410
411 if (!source->iovw.count) {
412 log_warning("Entry with no payload, skipping");
413 goto freeing;
414 }
415
416 assert(source->iovw.iovec);
417 assert(source->iovw.count);
418
419 r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
420 if (r < 0)
421 log_error("Failed to write entry of %zu bytes: %s",
422 iovw_size(&source->iovw), strerror(-r));
423 else
424 r = 1;
425
426 freeing:
427 iovw_free_contents(&source->iovw);
428 return r;
429 }