]>
Commit | Line | Data |
---|---|---|
db9ecf05 | 1 | /* SPDX-License-Identifier: LGPL-2.1-or-later */ |
b5efdb8a | 2 | |
eacbb4d3 | 3 | #include <curl/curl.h> |
cf0fbc49 | 4 | #include <stdbool.h> |
eacbb4d3 | 5 | |
0ab896b3 ZJS |
6 | #include "sd-daemon.h" |
7 | ||
b5efdb8a LP |
8 | #include "alloc-util.h" |
9 | #include "journal-upload.h" | |
eacbb4d3 | 10 | #include "log.h" |
0ab896b3 | 11 | #include "string-util.h" |
eacbb4d3 | 12 | #include "utf8.h" |
b5efdb8a | 13 | #include "util.h" |
eacbb4d3 ZJS |
14 | |
15 | /** | |
16 | * Write up to size bytes to buf. Return negative on error, and number of | |
17 | * bytes written otherwise. The last case is a kind of an error too. | |
18 | */ | |
19 | static ssize_t write_entry(char *buf, size_t size, Uploader *u) { | |
20 | int r; | |
21 | size_t pos = 0; | |
22 | ||
23 | assert(size <= SSIZE_MAX); | |
24 | ||
57255510 | 25 | for (;;) { |
eacbb4d3 ZJS |
26 | |
27 | switch(u->entry_state) { | |
28 | case ENTRY_CURSOR: { | |
a1e58e8e | 29 | u->current_cursor = mfree(u->current_cursor); |
eacbb4d3 | 30 | |
722b6795 | 31 | r = sd_journal_get_cursor(u->journal, &u->current_cursor); |
eb56eb9b MS |
32 | if (r < 0) |
33 | return log_error_errno(r, "Failed to get cursor: %m"); | |
eacbb4d3 ZJS |
34 | |
35 | r = snprintf(buf + pos, size - pos, | |
722b6795 | 36 | "__CURSOR=%s\n", u->current_cursor); |
91db8ed5 ZJS |
37 | assert(r >= 0); |
38 | if ((size_t) r > size - pos) | |
eacbb4d3 ZJS |
39 | /* not enough space */ |
40 | return pos; | |
41 | ||
313cefa1 | 42 | u->entry_state++; |
eacbb4d3 ZJS |
43 | |
44 | if (pos + r == size) { | |
45 | /* exactly one character short, but we don't need it */ | |
46 | buf[size - 1] = '\n'; | |
47 | return size; | |
48 | } | |
49 | ||
50 | pos += r; | |
4831981d SL |
51 | } |
52 | _fallthrough_; | |
eacbb4d3 ZJS |
53 | case ENTRY_REALTIME: { |
54 | usec_t realtime; | |
55 | ||
56 | r = sd_journal_get_realtime_usec(u->journal, &realtime); | |
eb56eb9b MS |
57 | if (r < 0) |
58 | return log_error_errno(r, "Failed to get realtime timestamp: %m"); | |
eacbb4d3 ZJS |
59 | |
60 | r = snprintf(buf + pos, size - pos, | |
61 | "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime); | |
91db8ed5 ZJS |
62 | assert(r >= 0); |
63 | if ((size_t) r > size - pos) | |
eacbb4d3 ZJS |
64 | /* not enough space */ |
65 | return pos; | |
66 | ||
313cefa1 | 67 | u->entry_state++; |
eacbb4d3 ZJS |
68 | |
69 | if (r + pos == size) { | |
70 | /* exactly one character short, but we don't need it */ | |
71 | buf[size - 1] = '\n'; | |
72 | return size; | |
73 | } | |
74 | ||
75 | pos += r; | |
4831981d SL |
76 | } |
77 | _fallthrough_; | |
eacbb4d3 ZJS |
78 | case ENTRY_MONOTONIC: { |
79 | usec_t monotonic; | |
80 | sd_id128_t boot_id; | |
81 | ||
82 | r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id); | |
eb56eb9b MS |
83 | if (r < 0) |
84 | return log_error_errno(r, "Failed to get monotonic timestamp: %m"); | |
eacbb4d3 ZJS |
85 | |
86 | r = snprintf(buf + pos, size - pos, | |
87 | "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic); | |
91db8ed5 ZJS |
88 | assert(r >= 0); |
89 | if ((size_t) r > size - pos) | |
eacbb4d3 ZJS |
90 | /* not enough space */ |
91 | return pos; | |
92 | ||
313cefa1 | 93 | u->entry_state++; |
eacbb4d3 ZJS |
94 | |
95 | if (r + pos == size) { | |
96 | /* exactly one character short, but we don't need it */ | |
97 | buf[size - 1] = '\n'; | |
98 | return size; | |
99 | } | |
100 | ||
101 | pos += r; | |
4831981d SL |
102 | } |
103 | _fallthrough_; | |
eacbb4d3 ZJS |
104 | case ENTRY_BOOT_ID: { |
105 | sd_id128_t boot_id; | |
5905d7cf | 106 | char sid[SD_ID128_STRING_MAX]; |
eacbb4d3 ZJS |
107 | |
108 | r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id); | |
eb56eb9b MS |
109 | if (r < 0) |
110 | return log_error_errno(r, "Failed to get monotonic timestamp: %m"); | |
eacbb4d3 ZJS |
111 | |
112 | r = snprintf(buf + pos, size - pos, | |
113 | "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid)); | |
91db8ed5 ZJS |
114 | assert(r >= 0); |
115 | if ((size_t) r > size - pos) | |
eacbb4d3 ZJS |
116 | /* not enough space */ |
117 | return pos; | |
118 | ||
313cefa1 | 119 | u->entry_state++; |
eacbb4d3 ZJS |
120 | |
121 | if (r + pos == size) { | |
122 | /* exactly one character short, but we don't need it */ | |
123 | buf[size - 1] = '\n'; | |
124 | return size; | |
125 | } | |
126 | ||
127 | pos += r; | |
4831981d SL |
128 | } |
129 | _fallthrough_; | |
eacbb4d3 ZJS |
130 | case ENTRY_NEW_FIELD: { |
131 | u->field_pos = 0; | |
132 | ||
133 | r = sd_journal_enumerate_data(u->journal, | |
134 | &u->field_data, | |
135 | &u->field_length); | |
eb56eb9b MS |
136 | if (r < 0) |
137 | return log_error_errno(r, "Failed to move to next field in entry: %m"); | |
138 | else if (r == 0) { | |
eacbb4d3 ZJS |
139 | u->entry_state = ENTRY_OUTRO; |
140 | continue; | |
141 | } | |
142 | ||
0ab896b3 ZJS |
143 | /* We already printed the boot id from the data in |
144 | * the header, hence let's suppress it here */ | |
145 | if (memory_startswith(u->field_data, u->field_length, "_BOOT_ID=")) | |
146 | continue; | |
147 | ||
148 | if (!utf8_is_printable_newline(u->field_data, u->field_length, false)) { | |
eacbb4d3 ZJS |
149 | u->entry_state = ENTRY_BINARY_FIELD_START; |
150 | continue; | |
151 | } | |
152 | ||
313cefa1 | 153 | u->entry_state++; |
4831981d SL |
154 | } |
155 | _fallthrough_; | |
eacbb4d3 ZJS |
156 | case ENTRY_TEXT_FIELD: |
157 | case ENTRY_BINARY_FIELD: { | |
158 | bool done; | |
159 | size_t tocopy; | |
160 | ||
161 | done = size - pos > u->field_length - u->field_pos; | |
162 | if (done) | |
163 | tocopy = u->field_length - u->field_pos; | |
164 | else | |
165 | tocopy = size - pos; | |
166 | ||
167 | memcpy(buf + pos, | |
168 | (char*) u->field_data + u->field_pos, | |
169 | tocopy); | |
170 | ||
171 | if (done) { | |
172 | buf[pos + tocopy] = '\n'; | |
173 | pos += tocopy + 1; | |
174 | u->entry_state = ENTRY_NEW_FIELD; | |
175 | continue; | |
176 | } else { | |
177 | u->field_pos += tocopy; | |
178 | return size; | |
179 | } | |
180 | } | |
181 | ||
182 | case ENTRY_BINARY_FIELD_START: { | |
183 | const char *c; | |
184 | size_t len; | |
185 | ||
186 | c = memchr(u->field_data, '=', u->field_length); | |
baaa35ad ZJS |
187 | if (!c || c == u->field_data) |
188 | return log_error_errno(SYNTHETIC_ERRNO(EINVAL), | |
189 | "Invalid field."); | |
eacbb4d3 ZJS |
190 | |
191 | len = c - (const char*)u->field_data; | |
192 | ||
193 | /* need space for label + '\n' */ | |
194 | if (size - pos < len + 1) | |
195 | return pos; | |
196 | ||
197 | memcpy(buf + pos, u->field_data, len); | |
198 | buf[pos + len] = '\n'; | |
199 | pos += len + 1; | |
200 | ||
201 | u->field_pos = len + 1; | |
313cefa1 | 202 | u->entry_state++; |
4831981d SL |
203 | } |
204 | _fallthrough_; | |
eacbb4d3 ZJS |
205 | case ENTRY_BINARY_FIELD_SIZE: { |
206 | uint64_t le64; | |
207 | ||
208 | /* need space for uint64_t */ | |
209 | if (size - pos < 8) | |
210 | return pos; | |
211 | ||
212 | le64 = htole64(u->field_length - u->field_pos); | |
213 | memcpy(buf + pos, &le64, 8); | |
214 | pos += 8; | |
215 | ||
313cefa1 | 216 | u->entry_state++; |
eacbb4d3 ZJS |
217 | continue; |
218 | } | |
219 | ||
220 | case ENTRY_OUTRO: | |
221 | /* need space for '\n' */ | |
222 | if (size - pos < 1) | |
223 | return pos; | |
224 | ||
225 | buf[pos++] = '\n'; | |
313cefa1 VC |
226 | u->entry_state++; |
227 | u->entries_sent++; | |
eacbb4d3 ZJS |
228 | |
229 | return pos; | |
230 | ||
231 | default: | |
232 | assert_not_reached("WTF?"); | |
233 | } | |
234 | } | |
235 | assert_not_reached("WTF?"); | |
236 | } | |
237 | ||
a1e92eee | 238 | static void check_update_watchdog(Uploader *u) { |
d79ca7a6 KC |
239 | usec_t after; |
240 | usec_t elapsed_time; | |
241 | ||
0aa176a7 | 242 | if (u->watchdog_usec <= 0) |
d79ca7a6 | 243 | return; |
0aa176a7 ZJS |
244 | |
245 | after = now(CLOCK_MONOTONIC); | |
54d8ef14 | 246 | elapsed_time = usec_sub_unsigned(after, u->watchdog_timestamp); |
0aa176a7 ZJS |
247 | if (elapsed_time > u->watchdog_usec / 2) { |
248 | log_debug("Update watchdog timer"); | |
249 | sd_notify(false, "WATCHDOG=1"); | |
250 | u->watchdog_timestamp = after; | |
d79ca7a6 KC |
251 | } |
252 | } | |
253 | ||
eacbb4d3 ZJS |
254 | static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { |
255 | Uploader *u = userp; | |
256 | int r; | |
257 | sd_journal *j; | |
258 | size_t filled = 0; | |
259 | ssize_t w; | |
260 | ||
261 | assert(u); | |
262 | assert(nmemb <= SSIZE_MAX / size); | |
263 | ||
d79ca7a6 KC |
264 | check_update_watchdog(u); |
265 | ||
eacbb4d3 ZJS |
266 | j = u->journal; |
267 | ||
268 | while (j && filled < size * nmemb) { | |
269 | if (u->entry_state == ENTRY_DONE) { | |
270 | r = sd_journal_next(j); | |
271 | if (r < 0) { | |
c33b3297 | 272 | log_error_errno(r, "Failed to move to next entry in journal: %m"); |
eacbb4d3 ZJS |
273 | return CURL_READFUNC_ABORT; |
274 | } else if (r == 0) { | |
275 | if (u->input_event) | |
276 | log_debug("No more entries, waiting for journal."); | |
277 | else { | |
278 | log_info("No more entries, closing journal."); | |
279 | close_journal_input(u); | |
280 | } | |
281 | ||
282 | u->uploading = false; | |
283 | ||
284 | break; | |
285 | } | |
286 | ||
287 | u->entry_state = ENTRY_CURSOR; | |
288 | } | |
289 | ||
290 | w = write_entry((char*)buf + filled, size * nmemb - filled, u); | |
291 | if (w < 0) | |
292 | return CURL_READFUNC_ABORT; | |
293 | filled += w; | |
294 | ||
295 | if (filled == 0) { | |
296 | log_error("Buffer space is too small to write entry."); | |
297 | return CURL_READFUNC_ABORT; | |
298 | } else if (u->entry_state != ENTRY_DONE) | |
299 | /* This means that all available space was used up */ | |
300 | break; | |
301 | ||
302 | log_debug("Entry %zu (%s) has been uploaded.", | |
722b6795 | 303 | u->entries_sent, u->current_cursor); |
eacbb4d3 ZJS |
304 | } |
305 | ||
306 | return filled; | |
307 | } | |
308 | ||
309 | void close_journal_input(Uploader *u) { | |
310 | assert(u); | |
311 | ||
312 | if (u->journal) { | |
313 | log_debug("Closing journal input."); | |
314 | ||
315 | sd_journal_close(u->journal); | |
316 | u->journal = NULL; | |
317 | } | |
318 | u->timeout = 0; | |
319 | } | |
320 | ||
321 | static int process_journal_input(Uploader *u, int skip) { | |
322 | int r; | |
323 | ||
8a3db16d KC |
324 | if (u->uploading) |
325 | return 0; | |
326 | ||
eacbb4d3 | 327 | r = sd_journal_next_skip(u->journal, skip); |
eb56eb9b MS |
328 | if (r < 0) |
329 | return log_error_errno(r, "Failed to skip to next entry: %m"); | |
330 | else if (r < skip) | |
eacbb4d3 ZJS |
331 | return 0; |
332 | ||
333 | /* have data */ | |
334 | u->entry_state = ENTRY_CURSOR; | |
335 | return start_upload(u, journal_input_callback, u); | |
336 | } | |
337 | ||
338 | int check_journal_input(Uploader *u) { | |
339 | if (u->input_event) { | |
340 | int r; | |
341 | ||
342 | r = sd_journal_process(u->journal); | |
343 | if (r < 0) { | |
da927ba9 | 344 | log_error_errno(r, "Failed to process journal: %m"); |
eacbb4d3 ZJS |
345 | close_journal_input(u); |
346 | return r; | |
347 | } | |
348 | ||
349 | if (r == SD_JOURNAL_NOP) | |
350 | return 0; | |
351 | } | |
352 | ||
353 | return process_journal_input(u, 1); | |
354 | } | |
355 | ||
356 | static int dispatch_journal_input(sd_event_source *event, | |
357 | int fd, | |
358 | uint32_t revents, | |
359 | void *userp) { | |
360 | Uploader *u = userp; | |
361 | ||
362 | assert(u); | |
363 | ||
8a3db16d | 364 | if (u->uploading) |
eacbb4d3 | 365 | return 0; |
eacbb4d3 ZJS |
366 | |
367 | log_debug("Detected journal input, checking for new data."); | |
368 | return check_journal_input(u); | |
369 | } | |
370 | ||
371 | int open_journal_for_upload(Uploader *u, | |
372 | sd_journal *j, | |
373 | const char *cursor, | |
374 | bool after_cursor, | |
375 | bool follow) { | |
376 | int fd, r, events; | |
377 | ||
378 | u->journal = j; | |
379 | ||
380 | sd_journal_set_data_threshold(j, 0); | |
381 | ||
382 | if (follow) { | |
383 | fd = sd_journal_get_fd(j); | |
eb56eb9b MS |
384 | if (fd < 0) |
385 | return log_error_errno(fd, "sd_journal_get_fd failed: %m"); | |
eacbb4d3 ZJS |
386 | |
387 | events = sd_journal_get_events(j); | |
388 | ||
389 | r = sd_journal_reliable_fd(j); | |
390 | assert(r >= 0); | |
391 | if (r > 0) | |
392 | u->timeout = -1; | |
393 | else | |
394 | u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT; | |
395 | ||
396 | r = sd_event_add_io(u->events, &u->input_event, | |
397 | fd, events, dispatch_journal_input, u); | |
eb56eb9b MS |
398 | if (r < 0) |
399 | return log_error_errno(r, "Failed to register input event: %m"); | |
eacbb4d3 ZJS |
400 | |
401 | log_debug("Listening for journal events on fd:%d, timeout %d", | |
402 | fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout); | |
403 | } else | |
404 | log_debug("Not listening for journal events."); | |
405 | ||
406 | if (cursor) { | |
407 | r = sd_journal_seek_cursor(j, cursor); | |
ece174c5 | 408 | if (r < 0) |
eb56eb9b MS |
409 | return log_error_errno(r, "Failed to seek to cursor %s: %m", |
410 | cursor); | |
eacbb4d3 ZJS |
411 | } |
412 | ||
bc48b25a | 413 | return process_journal_input(u, !!after_cursor); |
eacbb4d3 | 414 | } |