]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload-journal.c
tree-wide: drop {} from one-line if blocks
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
1 #include <stdbool.h>
2
3 #include <curl/curl.h>
4
5 #include "util.h"
6 #include "log.h"
7 #include "utf8.h"
8 #include "journal-upload.h"
9
10 /**
11 * Write up to size bytes to buf. Return negative on error, and number of
12 * bytes written otherwise. The last case is a kind of an error too.
13 */
14 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
15 int r;
16 size_t pos = 0;
17
18 assert(size <= SSIZE_MAX);
19
20 while (true) {
21
22 switch(u->entry_state) {
23 case ENTRY_CURSOR: {
24 u->current_cursor = mfree(u->current_cursor);
25
26 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
27 if (r < 0)
28 return log_error_errno(r, "Failed to get cursor: %m");
29
30 r = snprintf(buf + pos, size - pos,
31 "__CURSOR=%s\n", u->current_cursor);
32 if (pos + r > size)
33 /* not enough space */
34 return pos;
35
36 u->entry_state ++;
37
38 if (pos + r == size) {
39 /* exactly one character short, but we don't need it */
40 buf[size - 1] = '\n';
41 return size;
42 }
43
44 pos += r;
45 } /* fall through */
46
47 case ENTRY_REALTIME: {
48 usec_t realtime;
49
50 r = sd_journal_get_realtime_usec(u->journal, &realtime);
51 if (r < 0)
52 return log_error_errno(r, "Failed to get realtime timestamp: %m");
53
54 r = snprintf(buf + pos, size - pos,
55 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
56 if (r + pos > size)
57 /* not enough space */
58 return pos;
59
60 u->entry_state ++;
61
62 if (r + pos == size) {
63 /* exactly one character short, but we don't need it */
64 buf[size - 1] = '\n';
65 return size;
66 }
67
68 pos += r;
69 } /* fall through */
70
71 case ENTRY_MONOTONIC: {
72 usec_t monotonic;
73 sd_id128_t boot_id;
74
75 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
76 if (r < 0)
77 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
78
79 r = snprintf(buf + pos, size - pos,
80 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
81 if (r + pos > size)
82 /* not enough space */
83 return pos;
84
85 u->entry_state ++;
86
87 if (r + pos == size) {
88 /* exactly one character short, but we don't need it */
89 buf[size - 1] = '\n';
90 return size;
91 }
92
93 pos += r;
94 } /* fall through */
95
96 case ENTRY_BOOT_ID: {
97 sd_id128_t boot_id;
98 char sid[33];
99
100 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
101 if (r < 0)
102 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
103
104 r = snprintf(buf + pos, size - pos,
105 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
106 if (r + pos > size)
107 /* not enough space */
108 return pos;
109
110 u->entry_state ++;
111
112 if (r + pos == size) {
113 /* exactly one character short, but we don't need it */
114 buf[size - 1] = '\n';
115 return size;
116 }
117
118 pos += r;
119 } /* fall through */
120
121 case ENTRY_NEW_FIELD: {
122 u->field_pos = 0;
123
124 r = sd_journal_enumerate_data(u->journal,
125 &u->field_data,
126 &u->field_length);
127 if (r < 0)
128 return log_error_errno(r, "Failed to move to next field in entry: %m");
129 else if (r == 0) {
130 u->entry_state = ENTRY_OUTRO;
131 continue;
132 }
133
134 if (!utf8_is_printable_newline(u->field_data,
135 u->field_length, false)) {
136 u->entry_state = ENTRY_BINARY_FIELD_START;
137 continue;
138 }
139
140 u->entry_state ++;
141 } /* fall through */
142
143 case ENTRY_TEXT_FIELD:
144 case ENTRY_BINARY_FIELD: {
145 bool done;
146 size_t tocopy;
147
148 done = size - pos > u->field_length - u->field_pos;
149 if (done)
150 tocopy = u->field_length - u->field_pos;
151 else
152 tocopy = size - pos;
153
154 memcpy(buf + pos,
155 (char*) u->field_data + u->field_pos,
156 tocopy);
157
158 if (done) {
159 buf[pos + tocopy] = '\n';
160 pos += tocopy + 1;
161 u->entry_state = ENTRY_NEW_FIELD;
162 continue;
163 } else {
164 u->field_pos += tocopy;
165 return size;
166 }
167 }
168
169 case ENTRY_BINARY_FIELD_START: {
170 const char *c;
171 size_t len;
172
173 c = memchr(u->field_data, '=', u->field_length);
174 if (!c || c == u->field_data) {
175 log_error("Invalid field.");
176 return -EINVAL;
177 }
178
179 len = c - (const char*)u->field_data;
180
181 /* need space for label + '\n' */
182 if (size - pos < len + 1)
183 return pos;
184
185 memcpy(buf + pos, u->field_data, len);
186 buf[pos + len] = '\n';
187 pos += len + 1;
188
189 u->field_pos = len + 1;
190 u->entry_state ++;
191 } /* fall through */
192
193 case ENTRY_BINARY_FIELD_SIZE: {
194 uint64_t le64;
195
196 /* need space for uint64_t */
197 if (size - pos < 8)
198 return pos;
199
200 le64 = htole64(u->field_length - u->field_pos);
201 memcpy(buf + pos, &le64, 8);
202 pos += 8;
203
204 u->entry_state ++;
205 continue;
206 }
207
208 case ENTRY_OUTRO:
209 /* need space for '\n' */
210 if (size - pos < 1)
211 return pos;
212
213 buf[pos++] = '\n';
214 u->entry_state ++;
215 u->entries_sent ++;
216
217 return pos;
218
219 default:
220 assert_not_reached("WTF?");
221 }
222 }
223 assert_not_reached("WTF?");
224 }
225
226 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
227 Uploader *u = userp;
228 int r;
229 sd_journal *j;
230 size_t filled = 0;
231 ssize_t w;
232
233 assert(u);
234 assert(nmemb <= SSIZE_MAX / size);
235
236 j = u->journal;
237
238 while (j && filled < size * nmemb) {
239 if (u->entry_state == ENTRY_DONE) {
240 r = sd_journal_next(j);
241 if (r < 0) {
242 log_error_errno(r, "Failed to move to next entry in journal: %m");
243 return CURL_READFUNC_ABORT;
244 } else if (r == 0) {
245 if (u->input_event)
246 log_debug("No more entries, waiting for journal.");
247 else {
248 log_info("No more entries, closing journal.");
249 close_journal_input(u);
250 }
251
252 u->uploading = false;
253
254 break;
255 }
256
257 u->entry_state = ENTRY_CURSOR;
258 }
259
260 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
261 if (w < 0)
262 return CURL_READFUNC_ABORT;
263 filled += w;
264
265 if (filled == 0) {
266 log_error("Buffer space is too small to write entry.");
267 return CURL_READFUNC_ABORT;
268 } else if (u->entry_state != ENTRY_DONE)
269 /* This means that all available space was used up */
270 break;
271
272 log_debug("Entry %zu (%s) has been uploaded.",
273 u->entries_sent, u->current_cursor);
274 }
275
276 return filled;
277 }
278
279 void close_journal_input(Uploader *u) {
280 assert(u);
281
282 if (u->journal) {
283 log_debug("Closing journal input.");
284
285 sd_journal_close(u->journal);
286 u->journal = NULL;
287 }
288 u->timeout = 0;
289 }
290
291 static int process_journal_input(Uploader *u, int skip) {
292 int r;
293
294 r = sd_journal_next_skip(u->journal, skip);
295 if (r < 0)
296 return log_error_errno(r, "Failed to skip to next entry: %m");
297 else if (r < skip)
298 return 0;
299
300 /* have data */
301 u->entry_state = ENTRY_CURSOR;
302 return start_upload(u, journal_input_callback, u);
303 }
304
305 int check_journal_input(Uploader *u) {
306 if (u->input_event) {
307 int r;
308
309 r = sd_journal_process(u->journal);
310 if (r < 0) {
311 log_error_errno(r, "Failed to process journal: %m");
312 close_journal_input(u);
313 return r;
314 }
315
316 if (r == SD_JOURNAL_NOP)
317 return 0;
318 }
319
320 return process_journal_input(u, 1);
321 }
322
323 static int dispatch_journal_input(sd_event_source *event,
324 int fd,
325 uint32_t revents,
326 void *userp) {
327 Uploader *u = userp;
328
329 assert(u);
330
331 if (u->uploading) {
332 log_warning("dispatch_journal_input called when uploading, ignoring.");
333 return 0;
334 }
335
336 log_debug("Detected journal input, checking for new data.");
337 return check_journal_input(u);
338 }
339
340 int open_journal_for_upload(Uploader *u,
341 sd_journal *j,
342 const char *cursor,
343 bool after_cursor,
344 bool follow) {
345 int fd, r, events;
346
347 u->journal = j;
348
349 sd_journal_set_data_threshold(j, 0);
350
351 if (follow) {
352 fd = sd_journal_get_fd(j);
353 if (fd < 0)
354 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
355
356 events = sd_journal_get_events(j);
357
358 r = sd_journal_reliable_fd(j);
359 assert(r >= 0);
360 if (r > 0)
361 u->timeout = -1;
362 else
363 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
364
365 r = sd_event_add_io(u->events, &u->input_event,
366 fd, events, dispatch_journal_input, u);
367 if (r < 0)
368 return log_error_errno(r, "Failed to register input event: %m");
369
370 log_debug("Listening for journal events on fd:%d, timeout %d",
371 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
372 } else
373 log_debug("Not listening for journal events.");
374
375 if (cursor) {
376 r = sd_journal_seek_cursor(j, cursor);
377 if (r < 0)
378 return log_error_errno(r, "Failed to seek to cursor %s: %m",
379 cursor);
380 }
381
382 return process_journal_input(u, 1 + !!after_cursor);
383 }