]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-upload-journal.c
tree-wide: sort includes
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
CommitLineData
b5efdb8a
LP
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
eacbb4d3 22#include <curl/curl.h>
cf0fbc49 23#include <stdbool.h>
eacbb4d3 24
b5efdb8a
LP
25#include "alloc-util.h"
26#include "journal-upload.h"
eacbb4d3
ZJS
27#include "log.h"
28#include "utf8.h"
b5efdb8a 29#include "util.h"
eacbb4d3
ZJS
30
31/**
32 * Write up to size bytes to buf. Return negative on error, and number of
33 * bytes written otherwise. The last case is a kind of an error too.
34 */
35static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
36 int r;
37 size_t pos = 0;
38
39 assert(size <= SSIZE_MAX);
40
57255510 41 for (;;) {
eacbb4d3
ZJS
42
43 switch(u->entry_state) {
44 case ENTRY_CURSOR: {
a1e58e8e 45 u->current_cursor = mfree(u->current_cursor);
eacbb4d3 46
722b6795 47 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
eb56eb9b
MS
48 if (r < 0)
49 return log_error_errno(r, "Failed to get cursor: %m");
eacbb4d3
ZJS
50
51 r = snprintf(buf + pos, size - pos,
722b6795 52 "__CURSOR=%s\n", u->current_cursor);
eacbb4d3
ZJS
53 if (pos + r > size)
54 /* not enough space */
55 return pos;
56
57 u->entry_state ++;
58
59 if (pos + r == size) {
60 /* exactly one character short, but we don't need it */
61 buf[size - 1] = '\n';
62 return size;
63 }
64
65 pos += r;
66 } /* fall through */
67
68 case ENTRY_REALTIME: {
69 usec_t realtime;
70
71 r = sd_journal_get_realtime_usec(u->journal, &realtime);
eb56eb9b
MS
72 if (r < 0)
73 return log_error_errno(r, "Failed to get realtime timestamp: %m");
eacbb4d3
ZJS
74
75 r = snprintf(buf + pos, size - pos,
76 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
77 if (r + pos > size)
78 /* not enough space */
79 return pos;
80
81 u->entry_state ++;
82
83 if (r + pos == size) {
84 /* exactly one character short, but we don't need it */
85 buf[size - 1] = '\n';
86 return size;
87 }
88
89 pos += r;
90 } /* fall through */
91
92 case ENTRY_MONOTONIC: {
93 usec_t monotonic;
94 sd_id128_t boot_id;
95
96 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
eb56eb9b
MS
97 if (r < 0)
98 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
eacbb4d3
ZJS
99
100 r = snprintf(buf + pos, size - pos,
101 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
102 if (r + pos > size)
103 /* not enough space */
104 return pos;
105
106 u->entry_state ++;
107
108 if (r + pos == size) {
109 /* exactly one character short, but we don't need it */
110 buf[size - 1] = '\n';
111 return size;
112 }
113
114 pos += r;
115 } /* fall through */
116
117 case ENTRY_BOOT_ID: {
118 sd_id128_t boot_id;
119 char sid[33];
120
121 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
eb56eb9b
MS
122 if (r < 0)
123 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
eacbb4d3
ZJS
124
125 r = snprintf(buf + pos, size - pos,
126 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
5ffa8c81 127 if (r + pos > size)
eacbb4d3
ZJS
128 /* not enough space */
129 return pos;
130
131 u->entry_state ++;
132
133 if (r + pos == size) {
134 /* exactly one character short, but we don't need it */
135 buf[size - 1] = '\n';
136 return size;
137 }
138
139 pos += r;
140 } /* fall through */
141
142 case ENTRY_NEW_FIELD: {
143 u->field_pos = 0;
144
145 r = sd_journal_enumerate_data(u->journal,
146 &u->field_data,
147 &u->field_length);
eb56eb9b
MS
148 if (r < 0)
149 return log_error_errno(r, "Failed to move to next field in entry: %m");
150 else if (r == 0) {
eacbb4d3
ZJS
151 u->entry_state = ENTRY_OUTRO;
152 continue;
153 }
154
155 if (!utf8_is_printable_newline(u->field_data,
156 u->field_length, false)) {
157 u->entry_state = ENTRY_BINARY_FIELD_START;
158 continue;
159 }
160
161 u->entry_state ++;
162 } /* fall through */
163
164 case ENTRY_TEXT_FIELD:
165 case ENTRY_BINARY_FIELD: {
166 bool done;
167 size_t tocopy;
168
169 done = size - pos > u->field_length - u->field_pos;
170 if (done)
171 tocopy = u->field_length - u->field_pos;
172 else
173 tocopy = size - pos;
174
175 memcpy(buf + pos,
176 (char*) u->field_data + u->field_pos,
177 tocopy);
178
179 if (done) {
180 buf[pos + tocopy] = '\n';
181 pos += tocopy + 1;
182 u->entry_state = ENTRY_NEW_FIELD;
183 continue;
184 } else {
185 u->field_pos += tocopy;
186 return size;
187 }
188 }
189
190 case ENTRY_BINARY_FIELD_START: {
191 const char *c;
192 size_t len;
193
194 c = memchr(u->field_data, '=', u->field_length);
195 if (!c || c == u->field_data) {
196 log_error("Invalid field.");
197 return -EINVAL;
198 }
199
200 len = c - (const char*)u->field_data;
201
202 /* need space for label + '\n' */
203 if (size - pos < len + 1)
204 return pos;
205
206 memcpy(buf + pos, u->field_data, len);
207 buf[pos + len] = '\n';
208 pos += len + 1;
209
210 u->field_pos = len + 1;
211 u->entry_state ++;
212 } /* fall through */
213
214 case ENTRY_BINARY_FIELD_SIZE: {
215 uint64_t le64;
216
217 /* need space for uint64_t */
218 if (size - pos < 8)
219 return pos;
220
221 le64 = htole64(u->field_length - u->field_pos);
222 memcpy(buf + pos, &le64, 8);
223 pos += 8;
224
225 u->entry_state ++;
226 continue;
227 }
228
229 case ENTRY_OUTRO:
230 /* need space for '\n' */
231 if (size - pos < 1)
232 return pos;
233
234 buf[pos++] = '\n';
235 u->entry_state ++;
236 u->entries_sent ++;
237
238 return pos;
239
240 default:
241 assert_not_reached("WTF?");
242 }
243 }
244 assert_not_reached("WTF?");
245}
246
247static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
248 Uploader *u = userp;
249 int r;
250 sd_journal *j;
251 size_t filled = 0;
252 ssize_t w;
253
254 assert(u);
255 assert(nmemb <= SSIZE_MAX / size);
256
257 j = u->journal;
258
259 while (j && filled < size * nmemb) {
260 if (u->entry_state == ENTRY_DONE) {
261 r = sd_journal_next(j);
262 if (r < 0) {
c33b3297 263 log_error_errno(r, "Failed to move to next entry in journal: %m");
eacbb4d3
ZJS
264 return CURL_READFUNC_ABORT;
265 } else if (r == 0) {
266 if (u->input_event)
267 log_debug("No more entries, waiting for journal.");
268 else {
269 log_info("No more entries, closing journal.");
270 close_journal_input(u);
271 }
272
273 u->uploading = false;
274
275 break;
276 }
277
278 u->entry_state = ENTRY_CURSOR;
279 }
280
281 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
282 if (w < 0)
283 return CURL_READFUNC_ABORT;
284 filled += w;
285
286 if (filled == 0) {
287 log_error("Buffer space is too small to write entry.");
288 return CURL_READFUNC_ABORT;
289 } else if (u->entry_state != ENTRY_DONE)
290 /* This means that all available space was used up */
291 break;
292
293 log_debug("Entry %zu (%s) has been uploaded.",
722b6795 294 u->entries_sent, u->current_cursor);
eacbb4d3
ZJS
295 }
296
297 return filled;
298}
299
300void close_journal_input(Uploader *u) {
301 assert(u);
302
303 if (u->journal) {
304 log_debug("Closing journal input.");
305
306 sd_journal_close(u->journal);
307 u->journal = NULL;
308 }
309 u->timeout = 0;
310}
311
312static int process_journal_input(Uploader *u, int skip) {
313 int r;
314
315 r = sd_journal_next_skip(u->journal, skip);
eb56eb9b
MS
316 if (r < 0)
317 return log_error_errno(r, "Failed to skip to next entry: %m");
318 else if (r < skip)
eacbb4d3
ZJS
319 return 0;
320
321 /* have data */
322 u->entry_state = ENTRY_CURSOR;
323 return start_upload(u, journal_input_callback, u);
324}
325
326int check_journal_input(Uploader *u) {
327 if (u->input_event) {
328 int r;
329
330 r = sd_journal_process(u->journal);
331 if (r < 0) {
da927ba9 332 log_error_errno(r, "Failed to process journal: %m");
eacbb4d3
ZJS
333 close_journal_input(u);
334 return r;
335 }
336
337 if (r == SD_JOURNAL_NOP)
338 return 0;
339 }
340
341 return process_journal_input(u, 1);
342}
343
344static int dispatch_journal_input(sd_event_source *event,
345 int fd,
346 uint32_t revents,
347 void *userp) {
348 Uploader *u = userp;
349
350 assert(u);
351
352 if (u->uploading) {
353 log_warning("dispatch_journal_input called when uploading, ignoring.");
354 return 0;
355 }
356
357 log_debug("Detected journal input, checking for new data.");
358 return check_journal_input(u);
359}
360
361int open_journal_for_upload(Uploader *u,
362 sd_journal *j,
363 const char *cursor,
364 bool after_cursor,
365 bool follow) {
366 int fd, r, events;
367
368 u->journal = j;
369
370 sd_journal_set_data_threshold(j, 0);
371
372 if (follow) {
373 fd = sd_journal_get_fd(j);
eb56eb9b
MS
374 if (fd < 0)
375 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
eacbb4d3
ZJS
376
377 events = sd_journal_get_events(j);
378
379 r = sd_journal_reliable_fd(j);
380 assert(r >= 0);
381 if (r > 0)
382 u->timeout = -1;
383 else
384 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
385
386 r = sd_event_add_io(u->events, &u->input_event,
387 fd, events, dispatch_journal_input, u);
eb56eb9b
MS
388 if (r < 0)
389 return log_error_errno(r, "Failed to register input event: %m");
eacbb4d3
ZJS
390
391 log_debug("Listening for journal events on fd:%d, timeout %d",
392 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
393 } else
394 log_debug("Not listening for journal events.");
395
396 if (cursor) {
397 r = sd_journal_seek_cursor(j, cursor);
ece174c5 398 if (r < 0)
eb56eb9b
MS
399 return log_error_errno(r, "Failed to seek to cursor %s: %m",
400 cursor);
eacbb4d3
ZJS
401 }
402
403 return process_journal_input(u, 1 + !!after_cursor);
404}