]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload-journal.c
tree-wide: sort includes
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <curl/curl.h>
23 #include <stdbool.h>
24
25 #include "alloc-util.h"
26 #include "journal-upload.h"
27 #include "log.h"
28 #include "utf8.h"
29 #include "util.h"
30
31 /**
32 * Write up to size bytes to buf. Return negative on error, and number of
33 * bytes written otherwise. The last case is a kind of an error too.
34 */
35 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
36 int r;
37 size_t pos = 0;
38
39 assert(size <= SSIZE_MAX);
40
41 for (;;) {
42
43 switch(u->entry_state) {
44 case ENTRY_CURSOR: {
45 u->current_cursor = mfree(u->current_cursor);
46
47 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
48 if (r < 0)
49 return log_error_errno(r, "Failed to get cursor: %m");
50
51 r = snprintf(buf + pos, size - pos,
52 "__CURSOR=%s\n", u->current_cursor);
53 if (pos + r > size)
54 /* not enough space */
55 return pos;
56
57 u->entry_state ++;
58
59 if (pos + r == size) {
60 /* exactly one character short, but we don't need it */
61 buf[size - 1] = '\n';
62 return size;
63 }
64
65 pos += r;
66 } /* fall through */
67
68 case ENTRY_REALTIME: {
69 usec_t realtime;
70
71 r = sd_journal_get_realtime_usec(u->journal, &realtime);
72 if (r < 0)
73 return log_error_errno(r, "Failed to get realtime timestamp: %m");
74
75 r = snprintf(buf + pos, size - pos,
76 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
77 if (r + pos > size)
78 /* not enough space */
79 return pos;
80
81 u->entry_state ++;
82
83 if (r + pos == size) {
84 /* exactly one character short, but we don't need it */
85 buf[size - 1] = '\n';
86 return size;
87 }
88
89 pos += r;
90 } /* fall through */
91
92 case ENTRY_MONOTONIC: {
93 usec_t monotonic;
94 sd_id128_t boot_id;
95
96 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
97 if (r < 0)
98 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
99
100 r = snprintf(buf + pos, size - pos,
101 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
102 if (r + pos > size)
103 /* not enough space */
104 return pos;
105
106 u->entry_state ++;
107
108 if (r + pos == size) {
109 /* exactly one character short, but we don't need it */
110 buf[size - 1] = '\n';
111 return size;
112 }
113
114 pos += r;
115 } /* fall through */
116
117 case ENTRY_BOOT_ID: {
118 sd_id128_t boot_id;
119 char sid[33];
120
121 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
122 if (r < 0)
123 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
124
125 r = snprintf(buf + pos, size - pos,
126 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
127 if (r + pos > size)
128 /* not enough space */
129 return pos;
130
131 u->entry_state ++;
132
133 if (r + pos == size) {
134 /* exactly one character short, but we don't need it */
135 buf[size - 1] = '\n';
136 return size;
137 }
138
139 pos += r;
140 } /* fall through */
141
142 case ENTRY_NEW_FIELD: {
143 u->field_pos = 0;
144
145 r = sd_journal_enumerate_data(u->journal,
146 &u->field_data,
147 &u->field_length);
148 if (r < 0)
149 return log_error_errno(r, "Failed to move to next field in entry: %m");
150 else if (r == 0) {
151 u->entry_state = ENTRY_OUTRO;
152 continue;
153 }
154
155 if (!utf8_is_printable_newline(u->field_data,
156 u->field_length, false)) {
157 u->entry_state = ENTRY_BINARY_FIELD_START;
158 continue;
159 }
160
161 u->entry_state ++;
162 } /* fall through */
163
164 case ENTRY_TEXT_FIELD:
165 case ENTRY_BINARY_FIELD: {
166 bool done;
167 size_t tocopy;
168
169 done = size - pos > u->field_length - u->field_pos;
170 if (done)
171 tocopy = u->field_length - u->field_pos;
172 else
173 tocopy = size - pos;
174
175 memcpy(buf + pos,
176 (char*) u->field_data + u->field_pos,
177 tocopy);
178
179 if (done) {
180 buf[pos + tocopy] = '\n';
181 pos += tocopy + 1;
182 u->entry_state = ENTRY_NEW_FIELD;
183 continue;
184 } else {
185 u->field_pos += tocopy;
186 return size;
187 }
188 }
189
190 case ENTRY_BINARY_FIELD_START: {
191 const char *c;
192 size_t len;
193
194 c = memchr(u->field_data, '=', u->field_length);
195 if (!c || c == u->field_data) {
196 log_error("Invalid field.");
197 return -EINVAL;
198 }
199
200 len = c - (const char*)u->field_data;
201
202 /* need space for label + '\n' */
203 if (size - pos < len + 1)
204 return pos;
205
206 memcpy(buf + pos, u->field_data, len);
207 buf[pos + len] = '\n';
208 pos += len + 1;
209
210 u->field_pos = len + 1;
211 u->entry_state ++;
212 } /* fall through */
213
214 case ENTRY_BINARY_FIELD_SIZE: {
215 uint64_t le64;
216
217 /* need space for uint64_t */
218 if (size - pos < 8)
219 return pos;
220
221 le64 = htole64(u->field_length - u->field_pos);
222 memcpy(buf + pos, &le64, 8);
223 pos += 8;
224
225 u->entry_state ++;
226 continue;
227 }
228
229 case ENTRY_OUTRO:
230 /* need space for '\n' */
231 if (size - pos < 1)
232 return pos;
233
234 buf[pos++] = '\n';
235 u->entry_state ++;
236 u->entries_sent ++;
237
238 return pos;
239
240 default:
241 assert_not_reached("WTF?");
242 }
243 }
244 assert_not_reached("WTF?");
245 }
246
247 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
248 Uploader *u = userp;
249 int r;
250 sd_journal *j;
251 size_t filled = 0;
252 ssize_t w;
253
254 assert(u);
255 assert(nmemb <= SSIZE_MAX / size);
256
257 j = u->journal;
258
259 while (j && filled < size * nmemb) {
260 if (u->entry_state == ENTRY_DONE) {
261 r = sd_journal_next(j);
262 if (r < 0) {
263 log_error_errno(r, "Failed to move to next entry in journal: %m");
264 return CURL_READFUNC_ABORT;
265 } else if (r == 0) {
266 if (u->input_event)
267 log_debug("No more entries, waiting for journal.");
268 else {
269 log_info("No more entries, closing journal.");
270 close_journal_input(u);
271 }
272
273 u->uploading = false;
274
275 break;
276 }
277
278 u->entry_state = ENTRY_CURSOR;
279 }
280
281 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
282 if (w < 0)
283 return CURL_READFUNC_ABORT;
284 filled += w;
285
286 if (filled == 0) {
287 log_error("Buffer space is too small to write entry.");
288 return CURL_READFUNC_ABORT;
289 } else if (u->entry_state != ENTRY_DONE)
290 /* This means that all available space was used up */
291 break;
292
293 log_debug("Entry %zu (%s) has been uploaded.",
294 u->entries_sent, u->current_cursor);
295 }
296
297 return filled;
298 }
299
300 void close_journal_input(Uploader *u) {
301 assert(u);
302
303 if (u->journal) {
304 log_debug("Closing journal input.");
305
306 sd_journal_close(u->journal);
307 u->journal = NULL;
308 }
309 u->timeout = 0;
310 }
311
312 static int process_journal_input(Uploader *u, int skip) {
313 int r;
314
315 r = sd_journal_next_skip(u->journal, skip);
316 if (r < 0)
317 return log_error_errno(r, "Failed to skip to next entry: %m");
318 else if (r < skip)
319 return 0;
320
321 /* have data */
322 u->entry_state = ENTRY_CURSOR;
323 return start_upload(u, journal_input_callback, u);
324 }
325
326 int check_journal_input(Uploader *u) {
327 if (u->input_event) {
328 int r;
329
330 r = sd_journal_process(u->journal);
331 if (r < 0) {
332 log_error_errno(r, "Failed to process journal: %m");
333 close_journal_input(u);
334 return r;
335 }
336
337 if (r == SD_JOURNAL_NOP)
338 return 0;
339 }
340
341 return process_journal_input(u, 1);
342 }
343
344 static int dispatch_journal_input(sd_event_source *event,
345 int fd,
346 uint32_t revents,
347 void *userp) {
348 Uploader *u = userp;
349
350 assert(u);
351
352 if (u->uploading) {
353 log_warning("dispatch_journal_input called when uploading, ignoring.");
354 return 0;
355 }
356
357 log_debug("Detected journal input, checking for new data.");
358 return check_journal_input(u);
359 }
360
361 int open_journal_for_upload(Uploader *u,
362 sd_journal *j,
363 const char *cursor,
364 bool after_cursor,
365 bool follow) {
366 int fd, r, events;
367
368 u->journal = j;
369
370 sd_journal_set_data_threshold(j, 0);
371
372 if (follow) {
373 fd = sd_journal_get_fd(j);
374 if (fd < 0)
375 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
376
377 events = sd_journal_get_events(j);
378
379 r = sd_journal_reliable_fd(j);
380 assert(r >= 0);
381 if (r > 0)
382 u->timeout = -1;
383 else
384 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
385
386 r = sd_event_add_io(u->events, &u->input_event,
387 fd, events, dispatch_journal_input, u);
388 if (r < 0)
389 return log_error_errno(r, "Failed to register input event: %m");
390
391 log_debug("Listening for journal events on fd:%d, timeout %d",
392 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
393 } else
394 log_debug("Not listening for journal events.");
395
396 if (cursor) {
397 r = sd_journal_seek_cursor(j, cursor);
398 if (r < 0)
399 return log_error_errno(r, "Failed to seek to cursor %s: %m",
400 cursor);
401 }
402
403 return process_journal_input(u, 1 + !!after_cursor);
404 }