]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload-journal.c
1cd52db2c19ae18ff8c3755f1a266b260f23f4cf
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
1 #include <stdbool.h>
2
3 #include <curl/curl.h>
4
5 #include "util.h"
6 #include "log.h"
7 #include "utf8.h"
8 #include "journal-upload.h"
9
10 /**
11 * Write up to size bytes to buf. Return negative on error, and number of
12 * bytes written otherwise. The last case is a kind of an error too.
13 */
14 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
15 int r;
16 size_t pos = 0;
17
18 assert(size <= SSIZE_MAX);
19
20 while (true) {
21
22 switch(u->entry_state) {
23 case ENTRY_CURSOR: {
24 free(u->current_cursor);
25 u->current_cursor = NULL;
26
27 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
28 if (r < 0) {
29 log_error("Failed to get cursor: %s", strerror(-r));
30 return r;
31 }
32
33 r = snprintf(buf + pos, size - pos,
34 "__CURSOR=%s\n", u->current_cursor);
35 if (pos + r > size)
36 /* not enough space */
37 return pos;
38
39 u->entry_state ++;
40
41 if (pos + r == size) {
42 /* exactly one character short, but we don't need it */
43 buf[size - 1] = '\n';
44 return size;
45 }
46
47 pos += r;
48 } /* fall through */
49
50 case ENTRY_REALTIME: {
51 usec_t realtime;
52
53 r = sd_journal_get_realtime_usec(u->journal, &realtime);
54 if (r < 0) {
55 log_error("Failed to get realtime timestamp: %s", strerror(-r));
56 return r;
57 }
58
59 r = snprintf(buf + pos, size - pos,
60 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
61 if (r + pos > size)
62 /* not enough space */
63 return pos;
64
65 u->entry_state ++;
66
67 if (r + pos == size) {
68 /* exactly one character short, but we don't need it */
69 buf[size - 1] = '\n';
70 return size;
71 }
72
73 pos += r;
74 } /* fall through */
75
76 case ENTRY_MONOTONIC: {
77 usec_t monotonic;
78 sd_id128_t boot_id;
79
80 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
81 if (r < 0) {
82 log_error("Failed to get monotonic timestamp: %s", strerror(-r));
83 return r;
84 }
85
86 r = snprintf(buf + pos, size - pos,
87 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
88 if (r + pos > size)
89 /* not enough space */
90 return pos;
91
92 u->entry_state ++;
93
94 if (r + pos == size) {
95 /* exactly one character short, but we don't need it */
96 buf[size - 1] = '\n';
97 return size;
98 }
99
100 pos += r;
101 } /* fall through */
102
103 case ENTRY_BOOT_ID: {
104 sd_id128_t boot_id;
105 char sid[33];
106
107 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
108 if (r < 0) {
109 log_error("Failed to get monotonic timestamp: %s", strerror(-r));
110 return r;
111 }
112
113 r = snprintf(buf + pos, size - pos,
114 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
115 if (r + pos> size)
116 /* not enough space */
117 return pos;
118
119 u->entry_state ++;
120
121 if (r + pos == size) {
122 /* exactly one character short, but we don't need it */
123 buf[size - 1] = '\n';
124 return size;
125 }
126
127 pos += r;
128 } /* fall through */
129
130 case ENTRY_NEW_FIELD: {
131 u->field_pos = 0;
132
133 r = sd_journal_enumerate_data(u->journal,
134 &u->field_data,
135 &u->field_length);
136 if (r < 0) {
137 log_error("Failed to move to next field in entry: %s",
138 strerror(-r));
139 return r;
140 } else if (r == 0) {
141 u->entry_state = ENTRY_OUTRO;
142 continue;
143 }
144
145 if (!utf8_is_printable_newline(u->field_data,
146 u->field_length, false)) {
147 u->entry_state = ENTRY_BINARY_FIELD_START;
148 continue;
149 }
150
151 u->entry_state ++;
152 } /* fall through */
153
154 case ENTRY_TEXT_FIELD:
155 case ENTRY_BINARY_FIELD: {
156 bool done;
157 size_t tocopy;
158
159 done = size - pos > u->field_length - u->field_pos;
160 if (done)
161 tocopy = u->field_length - u->field_pos;
162 else
163 tocopy = size - pos;
164
165 memcpy(buf + pos,
166 (char*) u->field_data + u->field_pos,
167 tocopy);
168
169 if (done) {
170 buf[pos + tocopy] = '\n';
171 pos += tocopy + 1;
172 u->entry_state = ENTRY_NEW_FIELD;
173 continue;
174 } else {
175 u->field_pos += tocopy;
176 return size;
177 }
178 }
179
180 case ENTRY_BINARY_FIELD_START: {
181 const char *c;
182 size_t len;
183
184 c = memchr(u->field_data, '=', u->field_length);
185 if (!c || c == u->field_data) {
186 log_error("Invalid field.");
187 return -EINVAL;
188 }
189
190 len = c - (const char*)u->field_data;
191
192 /* need space for label + '\n' */
193 if (size - pos < len + 1)
194 return pos;
195
196 memcpy(buf + pos, u->field_data, len);
197 buf[pos + len] = '\n';
198 pos += len + 1;
199
200 u->field_pos = len + 1;
201 u->entry_state ++;
202 } /* fall through */
203
204 case ENTRY_BINARY_FIELD_SIZE: {
205 uint64_t le64;
206
207 /* need space for uint64_t */
208 if (size - pos < 8)
209 return pos;
210
211 le64 = htole64(u->field_length - u->field_pos);
212 memcpy(buf + pos, &le64, 8);
213 pos += 8;
214
215 u->entry_state ++;
216 continue;
217 }
218
219 case ENTRY_OUTRO:
220 /* need space for '\n' */
221 if (size - pos < 1)
222 return pos;
223
224 buf[pos++] = '\n';
225 u->entry_state ++;
226 u->entries_sent ++;
227
228 return pos;
229
230 default:
231 assert_not_reached("WTF?");
232 }
233 }
234 assert_not_reached("WTF?");
235 }
236
237 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
238 Uploader *u = userp;
239 int r;
240 sd_journal *j;
241 size_t filled = 0;
242 ssize_t w;
243
244 assert(u);
245 assert(nmemb <= SSIZE_MAX / size);
246
247 j = u->journal;
248
249 while (j && filled < size * nmemb) {
250 if (u->entry_state == ENTRY_DONE) {
251 r = sd_journal_next(j);
252 if (r < 0) {
253 log_error("Failed to move to next entry in journal: %s",
254 strerror(-r));
255 return CURL_READFUNC_ABORT;
256 } else if (r == 0) {
257 if (u->input_event)
258 log_debug("No more entries, waiting for journal.");
259 else {
260 log_info("No more entries, closing journal.");
261 close_journal_input(u);
262 }
263
264 u->uploading = false;
265
266 break;
267 }
268
269 u->entry_state = ENTRY_CURSOR;
270 }
271
272 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
273 if (w < 0)
274 return CURL_READFUNC_ABORT;
275 filled += w;
276
277 if (filled == 0) {
278 log_error("Buffer space is too small to write entry.");
279 return CURL_READFUNC_ABORT;
280 } else if (u->entry_state != ENTRY_DONE)
281 /* This means that all available space was used up */
282 break;
283
284 log_debug("Entry %zu (%s) has been uploaded.",
285 u->entries_sent, u->current_cursor);
286 }
287
288 return filled;
289 }
290
291 void close_journal_input(Uploader *u) {
292 assert(u);
293
294 if (u->journal) {
295 log_debug("Closing journal input.");
296
297 sd_journal_close(u->journal);
298 u->journal = NULL;
299 }
300 u->timeout = 0;
301 }
302
303 static int process_journal_input(Uploader *u, int skip) {
304 int r;
305
306 r = sd_journal_next_skip(u->journal, skip);
307 if (r < 0) {
308 log_error("Failed to skip to next entry: %s", strerror(-r));
309 return r;
310 } else if (r < skip)
311 return 0;
312
313 /* have data */
314 u->entry_state = ENTRY_CURSOR;
315 return start_upload(u, journal_input_callback, u);
316 }
317
318 int check_journal_input(Uploader *u) {
319 if (u->input_event) {
320 int r;
321
322 r = sd_journal_process(u->journal);
323 if (r < 0) {
324 log_error("Failed to process journal: %s", strerror(-r));
325 close_journal_input(u);
326 return r;
327 }
328
329 if (r == SD_JOURNAL_NOP)
330 return 0;
331 }
332
333 return process_journal_input(u, 1);
334 }
335
336 static int dispatch_journal_input(sd_event_source *event,
337 int fd,
338 uint32_t revents,
339 void *userp) {
340 Uploader *u = userp;
341
342 assert(u);
343
344 if (u->uploading) {
345 log_warning("dispatch_journal_input called when uploading, ignoring.");
346 return 0;
347 }
348
349 log_debug("Detected journal input, checking for new data.");
350 return check_journal_input(u);
351 }
352
353 int open_journal_for_upload(Uploader *u,
354 sd_journal *j,
355 const char *cursor,
356 bool after_cursor,
357 bool follow) {
358 int fd, r, events;
359
360 u->journal = j;
361
362 sd_journal_set_data_threshold(j, 0);
363
364 if (follow) {
365 fd = sd_journal_get_fd(j);
366 if (fd < 0) {
367 log_error("sd_journal_get_fd failed: %s", strerror(-fd));
368 return fd;
369 }
370
371 events = sd_journal_get_events(j);
372
373 r = sd_journal_reliable_fd(j);
374 assert(r >= 0);
375 if (r > 0)
376 u->timeout = -1;
377 else
378 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
379
380 r = sd_event_add_io(u->events, &u->input_event,
381 fd, events, dispatch_journal_input, u);
382 if (r < 0) {
383 log_error("Failed to register input event: %s", strerror(-r));
384 return r;
385 }
386
387 log_debug("Listening for journal events on fd:%d, timeout %d",
388 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
389 } else
390 log_debug("Not listening for journal events.");
391
392 if (cursor) {
393 r = sd_journal_seek_cursor(j, cursor);
394 if (r < 0) {
395 log_error("Failed to seek to cursor %s: %s",
396 cursor, strerror(-r));
397 return r;
398 }
399 }
400
401 return process_journal_input(u, 1 + !!after_cursor);
402 }