]>
Commit | Line | Data |
---|---|---|
eacbb4d3 ZJS |
1 | #include <stdbool.h> |
2 | ||
3 | #include <curl/curl.h> | |
4 | ||
5 | #include "util.h" | |
6 | #include "log.h" | |
7 | #include "utf8.h" | |
8 | #include "journal-upload.h" | |
9 | ||
10 | /** | |
11 | * Write up to size bytes to buf. Return negative on error, and number of | |
12 | * bytes written otherwise. The last case is a kind of an error too. | |
13 | */ | |
14 | static ssize_t write_entry(char *buf, size_t size, Uploader *u) { | |
15 | int r; | |
16 | size_t pos = 0; | |
17 | ||
18 | assert(size <= SSIZE_MAX); | |
19 | ||
20 | while (true) { | |
21 | ||
22 | switch(u->entry_state) { | |
23 | case ENTRY_CURSOR: { | |
722b6795 ZJS |
24 | free(u->current_cursor); |
25 | u->current_cursor = NULL; | |
eacbb4d3 | 26 | |
722b6795 | 27 | r = sd_journal_get_cursor(u->journal, &u->current_cursor); |
eb56eb9b MS |
28 | if (r < 0) |
29 | return log_error_errno(r, "Failed to get cursor: %m"); | |
eacbb4d3 ZJS |
30 | |
31 | r = snprintf(buf + pos, size - pos, | |
722b6795 | 32 | "__CURSOR=%s\n", u->current_cursor); |
eacbb4d3 ZJS |
33 | if (pos + r > size) |
34 | /* not enough space */ | |
35 | return pos; | |
36 | ||
37 | u->entry_state ++; | |
38 | ||
39 | if (pos + r == size) { | |
40 | /* exactly one character short, but we don't need it */ | |
41 | buf[size - 1] = '\n'; | |
42 | return size; | |
43 | } | |
44 | ||
45 | pos += r; | |
46 | } /* fall through */ | |
47 | ||
48 | case ENTRY_REALTIME: { | |
49 | usec_t realtime; | |
50 | ||
51 | r = sd_journal_get_realtime_usec(u->journal, &realtime); | |
eb56eb9b MS |
52 | if (r < 0) |
53 | return log_error_errno(r, "Failed to get realtime timestamp: %m"); | |
eacbb4d3 ZJS |
54 | |
55 | r = snprintf(buf + pos, size - pos, | |
56 | "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime); | |
57 | if (r + pos > size) | |
58 | /* not enough space */ | |
59 | return pos; | |
60 | ||
61 | u->entry_state ++; | |
62 | ||
63 | if (r + pos == size) { | |
64 | /* exactly one character short, but we don't need it */ | |
65 | buf[size - 1] = '\n'; | |
66 | return size; | |
67 | } | |
68 | ||
69 | pos += r; | |
70 | } /* fall through */ | |
71 | ||
72 | case ENTRY_MONOTONIC: { | |
73 | usec_t monotonic; | |
74 | sd_id128_t boot_id; | |
75 | ||
76 | r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id); | |
eb56eb9b MS |
77 | if (r < 0) |
78 | return log_error_errno(r, "Failed to get monotonic timestamp: %m"); | |
eacbb4d3 ZJS |
79 | |
80 | r = snprintf(buf + pos, size - pos, | |
81 | "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic); | |
82 | if (r + pos > size) | |
83 | /* not enough space */ | |
84 | return pos; | |
85 | ||
86 | u->entry_state ++; | |
87 | ||
88 | if (r + pos == size) { | |
89 | /* exactly one character short, but we don't need it */ | |
90 | buf[size - 1] = '\n'; | |
91 | return size; | |
92 | } | |
93 | ||
94 | pos += r; | |
95 | } /* fall through */ | |
96 | ||
97 | case ENTRY_BOOT_ID: { | |
98 | sd_id128_t boot_id; | |
99 | char sid[33]; | |
100 | ||
101 | r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id); | |
eb56eb9b MS |
102 | if (r < 0) |
103 | return log_error_errno(r, "Failed to get monotonic timestamp: %m"); | |
eacbb4d3 ZJS |
104 | |
105 | r = snprintf(buf + pos, size - pos, | |
106 | "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid)); | |
5ffa8c81 | 107 | if (r + pos > size) |
eacbb4d3 ZJS |
108 | /* not enough space */ |
109 | return pos; | |
110 | ||
111 | u->entry_state ++; | |
112 | ||
113 | if (r + pos == size) { | |
114 | /* exactly one character short, but we don't need it */ | |
115 | buf[size - 1] = '\n'; | |
116 | return size; | |
117 | } | |
118 | ||
119 | pos += r; | |
120 | } /* fall through */ | |
121 | ||
122 | case ENTRY_NEW_FIELD: { | |
123 | u->field_pos = 0; | |
124 | ||
125 | r = sd_journal_enumerate_data(u->journal, | |
126 | &u->field_data, | |
127 | &u->field_length); | |
eb56eb9b MS |
128 | if (r < 0) |
129 | return log_error_errno(r, "Failed to move to next field in entry: %m"); | |
130 | else if (r == 0) { | |
eacbb4d3 ZJS |
131 | u->entry_state = ENTRY_OUTRO; |
132 | continue; | |
133 | } | |
134 | ||
135 | if (!utf8_is_printable_newline(u->field_data, | |
136 | u->field_length, false)) { | |
137 | u->entry_state = ENTRY_BINARY_FIELD_START; | |
138 | continue; | |
139 | } | |
140 | ||
141 | u->entry_state ++; | |
142 | } /* fall through */ | |
143 | ||
144 | case ENTRY_TEXT_FIELD: | |
145 | case ENTRY_BINARY_FIELD: { | |
146 | bool done; | |
147 | size_t tocopy; | |
148 | ||
149 | done = size - pos > u->field_length - u->field_pos; | |
150 | if (done) | |
151 | tocopy = u->field_length - u->field_pos; | |
152 | else | |
153 | tocopy = size - pos; | |
154 | ||
155 | memcpy(buf + pos, | |
156 | (char*) u->field_data + u->field_pos, | |
157 | tocopy); | |
158 | ||
159 | if (done) { | |
160 | buf[pos + tocopy] = '\n'; | |
161 | pos += tocopy + 1; | |
162 | u->entry_state = ENTRY_NEW_FIELD; | |
163 | continue; | |
164 | } else { | |
165 | u->field_pos += tocopy; | |
166 | return size; | |
167 | } | |
168 | } | |
169 | ||
170 | case ENTRY_BINARY_FIELD_START: { | |
171 | const char *c; | |
172 | size_t len; | |
173 | ||
174 | c = memchr(u->field_data, '=', u->field_length); | |
175 | if (!c || c == u->field_data) { | |
176 | log_error("Invalid field."); | |
177 | return -EINVAL; | |
178 | } | |
179 | ||
180 | len = c - (const char*)u->field_data; | |
181 | ||
182 | /* need space for label + '\n' */ | |
183 | if (size - pos < len + 1) | |
184 | return pos; | |
185 | ||
186 | memcpy(buf + pos, u->field_data, len); | |
187 | buf[pos + len] = '\n'; | |
188 | pos += len + 1; | |
189 | ||
190 | u->field_pos = len + 1; | |
191 | u->entry_state ++; | |
192 | } /* fall through */ | |
193 | ||
194 | case ENTRY_BINARY_FIELD_SIZE: { | |
195 | uint64_t le64; | |
196 | ||
197 | /* need space for uint64_t */ | |
198 | if (size - pos < 8) | |
199 | return pos; | |
200 | ||
201 | le64 = htole64(u->field_length - u->field_pos); | |
202 | memcpy(buf + pos, &le64, 8); | |
203 | pos += 8; | |
204 | ||
205 | u->entry_state ++; | |
206 | continue; | |
207 | } | |
208 | ||
209 | case ENTRY_OUTRO: | |
210 | /* need space for '\n' */ | |
211 | if (size - pos < 1) | |
212 | return pos; | |
213 | ||
214 | buf[pos++] = '\n'; | |
215 | u->entry_state ++; | |
216 | u->entries_sent ++; | |
217 | ||
218 | return pos; | |
219 | ||
220 | default: | |
221 | assert_not_reached("WTF?"); | |
222 | } | |
223 | } | |
224 | assert_not_reached("WTF?"); | |
225 | } | |
226 | ||
227 | static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { | |
228 | Uploader *u = userp; | |
229 | int r; | |
230 | sd_journal *j; | |
231 | size_t filled = 0; | |
232 | ssize_t w; | |
233 | ||
234 | assert(u); | |
235 | assert(nmemb <= SSIZE_MAX / size); | |
236 | ||
237 | j = u->journal; | |
238 | ||
239 | while (j && filled < size * nmemb) { | |
240 | if (u->entry_state == ENTRY_DONE) { | |
241 | r = sd_journal_next(j); | |
242 | if (r < 0) { | |
c33b3297 | 243 | log_error_errno(r, "Failed to move to next entry in journal: %m"); |
eacbb4d3 ZJS |
244 | return CURL_READFUNC_ABORT; |
245 | } else if (r == 0) { | |
246 | if (u->input_event) | |
247 | log_debug("No more entries, waiting for journal."); | |
248 | else { | |
249 | log_info("No more entries, closing journal."); | |
250 | close_journal_input(u); | |
251 | } | |
252 | ||
253 | u->uploading = false; | |
254 | ||
255 | break; | |
256 | } | |
257 | ||
258 | u->entry_state = ENTRY_CURSOR; | |
259 | } | |
260 | ||
261 | w = write_entry((char*)buf + filled, size * nmemb - filled, u); | |
262 | if (w < 0) | |
263 | return CURL_READFUNC_ABORT; | |
264 | filled += w; | |
265 | ||
266 | if (filled == 0) { | |
267 | log_error("Buffer space is too small to write entry."); | |
268 | return CURL_READFUNC_ABORT; | |
269 | } else if (u->entry_state != ENTRY_DONE) | |
270 | /* This means that all available space was used up */ | |
271 | break; | |
272 | ||
273 | log_debug("Entry %zu (%s) has been uploaded.", | |
722b6795 | 274 | u->entries_sent, u->current_cursor); |
eacbb4d3 ZJS |
275 | } |
276 | ||
277 | return filled; | |
278 | } | |
279 | ||
280 | void close_journal_input(Uploader *u) { | |
281 | assert(u); | |
282 | ||
283 | if (u->journal) { | |
284 | log_debug("Closing journal input."); | |
285 | ||
286 | sd_journal_close(u->journal); | |
287 | u->journal = NULL; | |
288 | } | |
289 | u->timeout = 0; | |
290 | } | |
291 | ||
292 | static int process_journal_input(Uploader *u, int skip) { | |
293 | int r; | |
294 | ||
295 | r = sd_journal_next_skip(u->journal, skip); | |
eb56eb9b MS |
296 | if (r < 0) |
297 | return log_error_errno(r, "Failed to skip to next entry: %m"); | |
298 | else if (r < skip) | |
eacbb4d3 ZJS |
299 | return 0; |
300 | ||
301 | /* have data */ | |
302 | u->entry_state = ENTRY_CURSOR; | |
303 | return start_upload(u, journal_input_callback, u); | |
304 | } | |
305 | ||
306 | int check_journal_input(Uploader *u) { | |
307 | if (u->input_event) { | |
308 | int r; | |
309 | ||
310 | r = sd_journal_process(u->journal); | |
311 | if (r < 0) { | |
da927ba9 | 312 | log_error_errno(r, "Failed to process journal: %m"); |
eacbb4d3 ZJS |
313 | close_journal_input(u); |
314 | return r; | |
315 | } | |
316 | ||
317 | if (r == SD_JOURNAL_NOP) | |
318 | return 0; | |
319 | } | |
320 | ||
321 | return process_journal_input(u, 1); | |
322 | } | |
323 | ||
324 | static int dispatch_journal_input(sd_event_source *event, | |
325 | int fd, | |
326 | uint32_t revents, | |
327 | void *userp) { | |
328 | Uploader *u = userp; | |
329 | ||
330 | assert(u); | |
331 | ||
332 | if (u->uploading) { | |
333 | log_warning("dispatch_journal_input called when uploading, ignoring."); | |
334 | return 0; | |
335 | } | |
336 | ||
337 | log_debug("Detected journal input, checking for new data."); | |
338 | return check_journal_input(u); | |
339 | } | |
340 | ||
341 | int open_journal_for_upload(Uploader *u, | |
342 | sd_journal *j, | |
343 | const char *cursor, | |
344 | bool after_cursor, | |
345 | bool follow) { | |
346 | int fd, r, events; | |
347 | ||
348 | u->journal = j; | |
349 | ||
350 | sd_journal_set_data_threshold(j, 0); | |
351 | ||
352 | if (follow) { | |
353 | fd = sd_journal_get_fd(j); | |
eb56eb9b MS |
354 | if (fd < 0) |
355 | return log_error_errno(fd, "sd_journal_get_fd failed: %m"); | |
eacbb4d3 ZJS |
356 | |
357 | events = sd_journal_get_events(j); | |
358 | ||
359 | r = sd_journal_reliable_fd(j); | |
360 | assert(r >= 0); | |
361 | if (r > 0) | |
362 | u->timeout = -1; | |
363 | else | |
364 | u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT; | |
365 | ||
366 | r = sd_event_add_io(u->events, &u->input_event, | |
367 | fd, events, dispatch_journal_input, u); | |
eb56eb9b MS |
368 | if (r < 0) |
369 | return log_error_errno(r, "Failed to register input event: %m"); | |
eacbb4d3 ZJS |
370 | |
371 | log_debug("Listening for journal events on fd:%d, timeout %d", | |
372 | fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout); | |
373 | } else | |
374 | log_debug("Not listening for journal events."); | |
375 | ||
376 | if (cursor) { | |
377 | r = sd_journal_seek_cursor(j, cursor); | |
378 | if (r < 0) { | |
eb56eb9b MS |
379 | return log_error_errno(r, "Failed to seek to cursor %s: %m", |
380 | cursor); | |
eacbb4d3 ZJS |
381 | } |
382 | } | |
383 | ||
384 | return process_journal_input(u, 1 + !!after_cursor); | |
385 | } |