]>
Commit | Line | Data |
---|---|---|
b5efdb8a LP |
1 | /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ |
2 | ||
3 | /*** | |
4 | This file is part of systemd. | |
5 | ||
6 | Copyright 2014 Zbigniew Jędrzejewski-Szmek | |
7 | ||
8 | systemd is free software; you can redistribute it and/or modify it | |
9 | under the terms of the GNU Lesser General Public License as published by | |
10 | the Free Software Foundation; either version 2.1 of the License, or | |
11 | (at your option) any later version. | |
12 | ||
13 | systemd is distributed in the hope that it will be useful, but | |
14 | WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
16 | Lesser General Public License for more details. | |
17 | ||
18 | You should have received a copy of the GNU Lesser General Public License | |
19 | along with systemd; If not, see <http://www.gnu.org/licenses/>. | |
20 | ***/ | |
21 | ||
eacbb4d3 ZJS |
22 | #include <stdbool.h> |
23 | ||
24 | #include <curl/curl.h> | |
25 | ||
b5efdb8a LP |
26 | #include "alloc-util.h" |
27 | #include "journal-upload.h" | |
eacbb4d3 ZJS |
28 | #include "log.h" |
29 | #include "utf8.h" | |
b5efdb8a | 30 | #include "util.h" |
eacbb4d3 ZJS |
31 | |
32 | /** | |
33 | * Write up to size bytes to buf. Return negative on error, and number of | |
34 | * bytes written otherwise. The last case is a kind of an error too. | |
35 | */ | |
36 | static ssize_t write_entry(char *buf, size_t size, Uploader *u) { | |
37 | int r; | |
38 | size_t pos = 0; | |
39 | ||
40 | assert(size <= SSIZE_MAX); | |
41 | ||
57255510 | 42 | for (;;) { |
eacbb4d3 ZJS |
43 | |
44 | switch(u->entry_state) { | |
45 | case ENTRY_CURSOR: { | |
a1e58e8e | 46 | u->current_cursor = mfree(u->current_cursor); |
eacbb4d3 | 47 | |
722b6795 | 48 | r = sd_journal_get_cursor(u->journal, &u->current_cursor); |
eb56eb9b MS |
49 | if (r < 0) |
50 | return log_error_errno(r, "Failed to get cursor: %m"); | |
eacbb4d3 ZJS |
51 | |
52 | r = snprintf(buf + pos, size - pos, | |
722b6795 | 53 | "__CURSOR=%s\n", u->current_cursor); |
eacbb4d3 ZJS |
54 | if (pos + r > size) |
55 | /* not enough space */ | |
56 | return pos; | |
57 | ||
58 | u->entry_state ++; | |
59 | ||
60 | if (pos + r == size) { | |
61 | /* exactly one character short, but we don't need it */ | |
62 | buf[size - 1] = '\n'; | |
63 | return size; | |
64 | } | |
65 | ||
66 | pos += r; | |
67 | } /* fall through */ | |
68 | ||
69 | case ENTRY_REALTIME: { | |
70 | usec_t realtime; | |
71 | ||
72 | r = sd_journal_get_realtime_usec(u->journal, &realtime); | |
eb56eb9b MS |
73 | if (r < 0) |
74 | return log_error_errno(r, "Failed to get realtime timestamp: %m"); | |
eacbb4d3 ZJS |
75 | |
76 | r = snprintf(buf + pos, size - pos, | |
77 | "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime); | |
78 | if (r + pos > size) | |
79 | /* not enough space */ | |
80 | return pos; | |
81 | ||
82 | u->entry_state ++; | |
83 | ||
84 | if (r + pos == size) { | |
85 | /* exactly one character short, but we don't need it */ | |
86 | buf[size - 1] = '\n'; | |
87 | return size; | |
88 | } | |
89 | ||
90 | pos += r; | |
91 | } /* fall through */ | |
92 | ||
93 | case ENTRY_MONOTONIC: { | |
94 | usec_t monotonic; | |
95 | sd_id128_t boot_id; | |
96 | ||
97 | r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id); | |
eb56eb9b MS |
98 | if (r < 0) |
99 | return log_error_errno(r, "Failed to get monotonic timestamp: %m"); | |
eacbb4d3 ZJS |
100 | |
101 | r = snprintf(buf + pos, size - pos, | |
102 | "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic); | |
103 | if (r + pos > size) | |
104 | /* not enough space */ | |
105 | return pos; | |
106 | ||
107 | u->entry_state ++; | |
108 | ||
109 | if (r + pos == size) { | |
110 | /* exactly one character short, but we don't need it */ | |
111 | buf[size - 1] = '\n'; | |
112 | return size; | |
113 | } | |
114 | ||
115 | pos += r; | |
116 | } /* fall through */ | |
117 | ||
118 | case ENTRY_BOOT_ID: { | |
119 | sd_id128_t boot_id; | |
120 | char sid[33]; | |
121 | ||
122 | r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id); | |
eb56eb9b MS |
123 | if (r < 0) |
124 | return log_error_errno(r, "Failed to get monotonic timestamp: %m"); | |
eacbb4d3 ZJS |
125 | |
126 | r = snprintf(buf + pos, size - pos, | |
127 | "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid)); | |
5ffa8c81 | 128 | if (r + pos > size) |
eacbb4d3 ZJS |
129 | /* not enough space */ |
130 | return pos; | |
131 | ||
132 | u->entry_state ++; | |
133 | ||
134 | if (r + pos == size) { | |
135 | /* exactly one character short, but we don't need it */ | |
136 | buf[size - 1] = '\n'; | |
137 | return size; | |
138 | } | |
139 | ||
140 | pos += r; | |
141 | } /* fall through */ | |
142 | ||
143 | case ENTRY_NEW_FIELD: { | |
144 | u->field_pos = 0; | |
145 | ||
146 | r = sd_journal_enumerate_data(u->journal, | |
147 | &u->field_data, | |
148 | &u->field_length); | |
eb56eb9b MS |
149 | if (r < 0) |
150 | return log_error_errno(r, "Failed to move to next field in entry: %m"); | |
151 | else if (r == 0) { | |
eacbb4d3 ZJS |
152 | u->entry_state = ENTRY_OUTRO; |
153 | continue; | |
154 | } | |
155 | ||
156 | if (!utf8_is_printable_newline(u->field_data, | |
157 | u->field_length, false)) { | |
158 | u->entry_state = ENTRY_BINARY_FIELD_START; | |
159 | continue; | |
160 | } | |
161 | ||
162 | u->entry_state ++; | |
163 | } /* fall through */ | |
164 | ||
165 | case ENTRY_TEXT_FIELD: | |
166 | case ENTRY_BINARY_FIELD: { | |
167 | bool done; | |
168 | size_t tocopy; | |
169 | ||
170 | done = size - pos > u->field_length - u->field_pos; | |
171 | if (done) | |
172 | tocopy = u->field_length - u->field_pos; | |
173 | else | |
174 | tocopy = size - pos; | |
175 | ||
176 | memcpy(buf + pos, | |
177 | (char*) u->field_data + u->field_pos, | |
178 | tocopy); | |
179 | ||
180 | if (done) { | |
181 | buf[pos + tocopy] = '\n'; | |
182 | pos += tocopy + 1; | |
183 | u->entry_state = ENTRY_NEW_FIELD; | |
184 | continue; | |
185 | } else { | |
186 | u->field_pos += tocopy; | |
187 | return size; | |
188 | } | |
189 | } | |
190 | ||
191 | case ENTRY_BINARY_FIELD_START: { | |
192 | const char *c; | |
193 | size_t len; | |
194 | ||
195 | c = memchr(u->field_data, '=', u->field_length); | |
196 | if (!c || c == u->field_data) { | |
197 | log_error("Invalid field."); | |
198 | return -EINVAL; | |
199 | } | |
200 | ||
201 | len = c - (const char*)u->field_data; | |
202 | ||
203 | /* need space for label + '\n' */ | |
204 | if (size - pos < len + 1) | |
205 | return pos; | |
206 | ||
207 | memcpy(buf + pos, u->field_data, len); | |
208 | buf[pos + len] = '\n'; | |
209 | pos += len + 1; | |
210 | ||
211 | u->field_pos = len + 1; | |
212 | u->entry_state ++; | |
213 | } /* fall through */ | |
214 | ||
215 | case ENTRY_BINARY_FIELD_SIZE: { | |
216 | uint64_t le64; | |
217 | ||
218 | /* need space for uint64_t */ | |
219 | if (size - pos < 8) | |
220 | return pos; | |
221 | ||
222 | le64 = htole64(u->field_length - u->field_pos); | |
223 | memcpy(buf + pos, &le64, 8); | |
224 | pos += 8; | |
225 | ||
226 | u->entry_state ++; | |
227 | continue; | |
228 | } | |
229 | ||
230 | case ENTRY_OUTRO: | |
231 | /* need space for '\n' */ | |
232 | if (size - pos < 1) | |
233 | return pos; | |
234 | ||
235 | buf[pos++] = '\n'; | |
236 | u->entry_state ++; | |
237 | u->entries_sent ++; | |
238 | ||
239 | return pos; | |
240 | ||
241 | default: | |
242 | assert_not_reached("WTF?"); | |
243 | } | |
244 | } | |
245 | assert_not_reached("WTF?"); | |
246 | } | |
247 | ||
248 | static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { | |
249 | Uploader *u = userp; | |
250 | int r; | |
251 | sd_journal *j; | |
252 | size_t filled = 0; | |
253 | ssize_t w; | |
254 | ||
255 | assert(u); | |
256 | assert(nmemb <= SSIZE_MAX / size); | |
257 | ||
258 | j = u->journal; | |
259 | ||
260 | while (j && filled < size * nmemb) { | |
261 | if (u->entry_state == ENTRY_DONE) { | |
262 | r = sd_journal_next(j); | |
263 | if (r < 0) { | |
c33b3297 | 264 | log_error_errno(r, "Failed to move to next entry in journal: %m"); |
eacbb4d3 ZJS |
265 | return CURL_READFUNC_ABORT; |
266 | } else if (r == 0) { | |
267 | if (u->input_event) | |
268 | log_debug("No more entries, waiting for journal."); | |
269 | else { | |
270 | log_info("No more entries, closing journal."); | |
271 | close_journal_input(u); | |
272 | } | |
273 | ||
274 | u->uploading = false; | |
275 | ||
276 | break; | |
277 | } | |
278 | ||
279 | u->entry_state = ENTRY_CURSOR; | |
280 | } | |
281 | ||
282 | w = write_entry((char*)buf + filled, size * nmemb - filled, u); | |
283 | if (w < 0) | |
284 | return CURL_READFUNC_ABORT; | |
285 | filled += w; | |
286 | ||
287 | if (filled == 0) { | |
288 | log_error("Buffer space is too small to write entry."); | |
289 | return CURL_READFUNC_ABORT; | |
290 | } else if (u->entry_state != ENTRY_DONE) | |
291 | /* This means that all available space was used up */ | |
292 | break; | |
293 | ||
294 | log_debug("Entry %zu (%s) has been uploaded.", | |
722b6795 | 295 | u->entries_sent, u->current_cursor); |
eacbb4d3 ZJS |
296 | } |
297 | ||
298 | return filled; | |
299 | } | |
300 | ||
301 | void close_journal_input(Uploader *u) { | |
302 | assert(u); | |
303 | ||
304 | if (u->journal) { | |
305 | log_debug("Closing journal input."); | |
306 | ||
307 | sd_journal_close(u->journal); | |
308 | u->journal = NULL; | |
309 | } | |
310 | u->timeout = 0; | |
311 | } | |
312 | ||
313 | static int process_journal_input(Uploader *u, int skip) { | |
314 | int r; | |
315 | ||
316 | r = sd_journal_next_skip(u->journal, skip); | |
eb56eb9b MS |
317 | if (r < 0) |
318 | return log_error_errno(r, "Failed to skip to next entry: %m"); | |
319 | else if (r < skip) | |
eacbb4d3 ZJS |
320 | return 0; |
321 | ||
322 | /* have data */ | |
323 | u->entry_state = ENTRY_CURSOR; | |
324 | return start_upload(u, journal_input_callback, u); | |
325 | } | |
326 | ||
327 | int check_journal_input(Uploader *u) { | |
328 | if (u->input_event) { | |
329 | int r; | |
330 | ||
331 | r = sd_journal_process(u->journal); | |
332 | if (r < 0) { | |
da927ba9 | 333 | log_error_errno(r, "Failed to process journal: %m"); |
eacbb4d3 ZJS |
334 | close_journal_input(u); |
335 | return r; | |
336 | } | |
337 | ||
338 | if (r == SD_JOURNAL_NOP) | |
339 | return 0; | |
340 | } | |
341 | ||
342 | return process_journal_input(u, 1); | |
343 | } | |
344 | ||
345 | static int dispatch_journal_input(sd_event_source *event, | |
346 | int fd, | |
347 | uint32_t revents, | |
348 | void *userp) { | |
349 | Uploader *u = userp; | |
350 | ||
351 | assert(u); | |
352 | ||
353 | if (u->uploading) { | |
354 | log_warning("dispatch_journal_input called when uploading, ignoring."); | |
355 | return 0; | |
356 | } | |
357 | ||
358 | log_debug("Detected journal input, checking for new data."); | |
359 | return check_journal_input(u); | |
360 | } | |
361 | ||
362 | int open_journal_for_upload(Uploader *u, | |
363 | sd_journal *j, | |
364 | const char *cursor, | |
365 | bool after_cursor, | |
366 | bool follow) { | |
367 | int fd, r, events; | |
368 | ||
369 | u->journal = j; | |
370 | ||
371 | sd_journal_set_data_threshold(j, 0); | |
372 | ||
373 | if (follow) { | |
374 | fd = sd_journal_get_fd(j); | |
eb56eb9b MS |
375 | if (fd < 0) |
376 | return log_error_errno(fd, "sd_journal_get_fd failed: %m"); | |
eacbb4d3 ZJS |
377 | |
378 | events = sd_journal_get_events(j); | |
379 | ||
380 | r = sd_journal_reliable_fd(j); | |
381 | assert(r >= 0); | |
382 | if (r > 0) | |
383 | u->timeout = -1; | |
384 | else | |
385 | u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT; | |
386 | ||
387 | r = sd_event_add_io(u->events, &u->input_event, | |
388 | fd, events, dispatch_journal_input, u); | |
eb56eb9b MS |
389 | if (r < 0) |
390 | return log_error_errno(r, "Failed to register input event: %m"); | |
eacbb4d3 ZJS |
391 | |
392 | log_debug("Listening for journal events on fd:%d, timeout %d", | |
393 | fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout); | |
394 | } else | |
395 | log_debug("Not listening for journal events."); | |
396 | ||
397 | if (cursor) { | |
398 | r = sd_journal_seek_cursor(j, cursor); | |
ece174c5 | 399 | if (r < 0) |
eb56eb9b MS |
400 | return log_error_errno(r, "Failed to seek to cursor %s: %m", |
401 | cursor); | |
eacbb4d3 ZJS |
402 | } |
403 | ||
404 | return process_journal_input(u, 1 + !!after_cursor); | |
405 | } |