]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/journal-remote/journal-upload-journal.c
logind: check return value of session_release
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
CommitLineData
eacbb4d3
ZJS
1#include <stdbool.h>
2
3#include <curl/curl.h>
4
5#include "util.h"
6#include "log.h"
7#include "utf8.h"
8#include "journal-upload.h"
9
10/**
11 * Write up to size bytes to buf. Return negative on error, and number of
12 * bytes written otherwise. The last case is a kind of an error too.
13 */
14static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
15 int r;
16 size_t pos = 0;
17
18 assert(size <= SSIZE_MAX);
19
20 while (true) {
21
22 switch(u->entry_state) {
23 case ENTRY_CURSOR: {
722b6795
ZJS
24 free(u->current_cursor);
25 u->current_cursor = NULL;
eacbb4d3 26
722b6795 27 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
eb56eb9b
MS
28 if (r < 0)
29 return log_error_errno(r, "Failed to get cursor: %m");
eacbb4d3
ZJS
30
31 r = snprintf(buf + pos, size - pos,
722b6795 32 "__CURSOR=%s\n", u->current_cursor);
eacbb4d3
ZJS
33 if (pos + r > size)
34 /* not enough space */
35 return pos;
36
37 u->entry_state ++;
38
39 if (pos + r == size) {
40 /* exactly one character short, but we don't need it */
41 buf[size - 1] = '\n';
42 return size;
43 }
44
45 pos += r;
46 } /* fall through */
47
48 case ENTRY_REALTIME: {
49 usec_t realtime;
50
51 r = sd_journal_get_realtime_usec(u->journal, &realtime);
eb56eb9b
MS
52 if (r < 0)
53 return log_error_errno(r, "Failed to get realtime timestamp: %m");
eacbb4d3
ZJS
54
55 r = snprintf(buf + pos, size - pos,
56 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
57 if (r + pos > size)
58 /* not enough space */
59 return pos;
60
61 u->entry_state ++;
62
63 if (r + pos == size) {
64 /* exactly one character short, but we don't need it */
65 buf[size - 1] = '\n';
66 return size;
67 }
68
69 pos += r;
70 } /* fall through */
71
72 case ENTRY_MONOTONIC: {
73 usec_t monotonic;
74 sd_id128_t boot_id;
75
76 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
eb56eb9b
MS
77 if (r < 0)
78 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
eacbb4d3
ZJS
79
80 r = snprintf(buf + pos, size - pos,
81 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
82 if (r + pos > size)
83 /* not enough space */
84 return pos;
85
86 u->entry_state ++;
87
88 if (r + pos == size) {
89 /* exactly one character short, but we don't need it */
90 buf[size - 1] = '\n';
91 return size;
92 }
93
94 pos += r;
95 } /* fall through */
96
97 case ENTRY_BOOT_ID: {
98 sd_id128_t boot_id;
99 char sid[33];
100
101 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
eb56eb9b
MS
102 if (r < 0)
103 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
eacbb4d3
ZJS
104
105 r = snprintf(buf + pos, size - pos,
106 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
5ffa8c81 107 if (r + pos > size)
eacbb4d3
ZJS
108 /* not enough space */
109 return pos;
110
111 u->entry_state ++;
112
113 if (r + pos == size) {
114 /* exactly one character short, but we don't need it */
115 buf[size - 1] = '\n';
116 return size;
117 }
118
119 pos += r;
120 } /* fall through */
121
122 case ENTRY_NEW_FIELD: {
123 u->field_pos = 0;
124
125 r = sd_journal_enumerate_data(u->journal,
126 &u->field_data,
127 &u->field_length);
eb56eb9b
MS
128 if (r < 0)
129 return log_error_errno(r, "Failed to move to next field in entry: %m");
130 else if (r == 0) {
eacbb4d3
ZJS
131 u->entry_state = ENTRY_OUTRO;
132 continue;
133 }
134
135 if (!utf8_is_printable_newline(u->field_data,
136 u->field_length, false)) {
137 u->entry_state = ENTRY_BINARY_FIELD_START;
138 continue;
139 }
140
141 u->entry_state ++;
142 } /* fall through */
143
144 case ENTRY_TEXT_FIELD:
145 case ENTRY_BINARY_FIELD: {
146 bool done;
147 size_t tocopy;
148
149 done = size - pos > u->field_length - u->field_pos;
150 if (done)
151 tocopy = u->field_length - u->field_pos;
152 else
153 tocopy = size - pos;
154
155 memcpy(buf + pos,
156 (char*) u->field_data + u->field_pos,
157 tocopy);
158
159 if (done) {
160 buf[pos + tocopy] = '\n';
161 pos += tocopy + 1;
162 u->entry_state = ENTRY_NEW_FIELD;
163 continue;
164 } else {
165 u->field_pos += tocopy;
166 return size;
167 }
168 }
169
170 case ENTRY_BINARY_FIELD_START: {
171 const char *c;
172 size_t len;
173
174 c = memchr(u->field_data, '=', u->field_length);
175 if (!c || c == u->field_data) {
176 log_error("Invalid field.");
177 return -EINVAL;
178 }
179
180 len = c - (const char*)u->field_data;
181
182 /* need space for label + '\n' */
183 if (size - pos < len + 1)
184 return pos;
185
186 memcpy(buf + pos, u->field_data, len);
187 buf[pos + len] = '\n';
188 pos += len + 1;
189
190 u->field_pos = len + 1;
191 u->entry_state ++;
192 } /* fall through */
193
194 case ENTRY_BINARY_FIELD_SIZE: {
195 uint64_t le64;
196
197 /* need space for uint64_t */
198 if (size - pos < 8)
199 return pos;
200
201 le64 = htole64(u->field_length - u->field_pos);
202 memcpy(buf + pos, &le64, 8);
203 pos += 8;
204
205 u->entry_state ++;
206 continue;
207 }
208
209 case ENTRY_OUTRO:
210 /* need space for '\n' */
211 if (size - pos < 1)
212 return pos;
213
214 buf[pos++] = '\n';
215 u->entry_state ++;
216 u->entries_sent ++;
217
218 return pos;
219
220 default:
221 assert_not_reached("WTF?");
222 }
223 }
224 assert_not_reached("WTF?");
225}
226
227static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
228 Uploader *u = userp;
229 int r;
230 sd_journal *j;
231 size_t filled = 0;
232 ssize_t w;
233
234 assert(u);
235 assert(nmemb <= SSIZE_MAX / size);
236
237 j = u->journal;
238
239 while (j && filled < size * nmemb) {
240 if (u->entry_state == ENTRY_DONE) {
241 r = sd_journal_next(j);
242 if (r < 0) {
c33b3297 243 log_error_errno(r, "Failed to move to next entry in journal: %m");
eacbb4d3
ZJS
244 return CURL_READFUNC_ABORT;
245 } else if (r == 0) {
246 if (u->input_event)
247 log_debug("No more entries, waiting for journal.");
248 else {
249 log_info("No more entries, closing journal.");
250 close_journal_input(u);
251 }
252
253 u->uploading = false;
254
255 break;
256 }
257
258 u->entry_state = ENTRY_CURSOR;
259 }
260
261 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
262 if (w < 0)
263 return CURL_READFUNC_ABORT;
264 filled += w;
265
266 if (filled == 0) {
267 log_error("Buffer space is too small to write entry.");
268 return CURL_READFUNC_ABORT;
269 } else if (u->entry_state != ENTRY_DONE)
270 /* This means that all available space was used up */
271 break;
272
273 log_debug("Entry %zu (%s) has been uploaded.",
722b6795 274 u->entries_sent, u->current_cursor);
eacbb4d3
ZJS
275 }
276
277 return filled;
278}
279
280void close_journal_input(Uploader *u) {
281 assert(u);
282
283 if (u->journal) {
284 log_debug("Closing journal input.");
285
286 sd_journal_close(u->journal);
287 u->journal = NULL;
288 }
289 u->timeout = 0;
290}
291
292static int process_journal_input(Uploader *u, int skip) {
293 int r;
294
295 r = sd_journal_next_skip(u->journal, skip);
eb56eb9b
MS
296 if (r < 0)
297 return log_error_errno(r, "Failed to skip to next entry: %m");
298 else if (r < skip)
eacbb4d3
ZJS
299 return 0;
300
301 /* have data */
302 u->entry_state = ENTRY_CURSOR;
303 return start_upload(u, journal_input_callback, u);
304}
305
306int check_journal_input(Uploader *u) {
307 if (u->input_event) {
308 int r;
309
310 r = sd_journal_process(u->journal);
311 if (r < 0) {
da927ba9 312 log_error_errno(r, "Failed to process journal: %m");
eacbb4d3
ZJS
313 close_journal_input(u);
314 return r;
315 }
316
317 if (r == SD_JOURNAL_NOP)
318 return 0;
319 }
320
321 return process_journal_input(u, 1);
322}
323
324static int dispatch_journal_input(sd_event_source *event,
325 int fd,
326 uint32_t revents,
327 void *userp) {
328 Uploader *u = userp;
329
330 assert(u);
331
332 if (u->uploading) {
333 log_warning("dispatch_journal_input called when uploading, ignoring.");
334 return 0;
335 }
336
337 log_debug("Detected journal input, checking for new data.");
338 return check_journal_input(u);
339}
340
341int open_journal_for_upload(Uploader *u,
342 sd_journal *j,
343 const char *cursor,
344 bool after_cursor,
345 bool follow) {
346 int fd, r, events;
347
348 u->journal = j;
349
350 sd_journal_set_data_threshold(j, 0);
351
352 if (follow) {
353 fd = sd_journal_get_fd(j);
eb56eb9b
MS
354 if (fd < 0)
355 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
eacbb4d3
ZJS
356
357 events = sd_journal_get_events(j);
358
359 r = sd_journal_reliable_fd(j);
360 assert(r >= 0);
361 if (r > 0)
362 u->timeout = -1;
363 else
364 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
365
366 r = sd_event_add_io(u->events, &u->input_event,
367 fd, events, dispatch_journal_input, u);
eb56eb9b
MS
368 if (r < 0)
369 return log_error_errno(r, "Failed to register input event: %m");
eacbb4d3
ZJS
370
371 log_debug("Listening for journal events on fd:%d, timeout %d",
372 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
373 } else
374 log_debug("Not listening for journal events.");
375
376 if (cursor) {
377 r = sd_journal_seek_cursor(j, cursor);
378 if (r < 0) {
eb56eb9b
MS
379 return log_error_errno(r, "Failed to seek to cursor %s: %m",
380 cursor);
eacbb4d3
ZJS
381 }
382 }
383
384 return process_journal_input(u, 1 + !!after_cursor);
385}