]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/journal-remote/journal-upload-journal.c
util-lib: split out allocation calls into alloc-util.[ch]
[thirdparty/systemd.git] / src / journal-remote / journal-upload-journal.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2014 Zbigniew Jędrzejewski-Szmek
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <stdbool.h>
23
24 #include <curl/curl.h>
25
26 #include "alloc-util.h"
27 #include "journal-upload.h"
28 #include "log.h"
29 #include "utf8.h"
30 #include "util.h"
31
32 /**
33 * Write up to size bytes to buf. Return negative on error, and number of
34 * bytes written otherwise. The last case is a kind of an error too.
35 */
36 static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
37 int r;
38 size_t pos = 0;
39
40 assert(size <= SSIZE_MAX);
41
42 for (;;) {
43
44 switch(u->entry_state) {
45 case ENTRY_CURSOR: {
46 u->current_cursor = mfree(u->current_cursor);
47
48 r = sd_journal_get_cursor(u->journal, &u->current_cursor);
49 if (r < 0)
50 return log_error_errno(r, "Failed to get cursor: %m");
51
52 r = snprintf(buf + pos, size - pos,
53 "__CURSOR=%s\n", u->current_cursor);
54 if (pos + r > size)
55 /* not enough space */
56 return pos;
57
58 u->entry_state ++;
59
60 if (pos + r == size) {
61 /* exactly one character short, but we don't need it */
62 buf[size - 1] = '\n';
63 return size;
64 }
65
66 pos += r;
67 } /* fall through */
68
69 case ENTRY_REALTIME: {
70 usec_t realtime;
71
72 r = sd_journal_get_realtime_usec(u->journal, &realtime);
73 if (r < 0)
74 return log_error_errno(r, "Failed to get realtime timestamp: %m");
75
76 r = snprintf(buf + pos, size - pos,
77 "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime);
78 if (r + pos > size)
79 /* not enough space */
80 return pos;
81
82 u->entry_state ++;
83
84 if (r + pos == size) {
85 /* exactly one character short, but we don't need it */
86 buf[size - 1] = '\n';
87 return size;
88 }
89
90 pos += r;
91 } /* fall through */
92
93 case ENTRY_MONOTONIC: {
94 usec_t monotonic;
95 sd_id128_t boot_id;
96
97 r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id);
98 if (r < 0)
99 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
100
101 r = snprintf(buf + pos, size - pos,
102 "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic);
103 if (r + pos > size)
104 /* not enough space */
105 return pos;
106
107 u->entry_state ++;
108
109 if (r + pos == size) {
110 /* exactly one character short, but we don't need it */
111 buf[size - 1] = '\n';
112 return size;
113 }
114
115 pos += r;
116 } /* fall through */
117
118 case ENTRY_BOOT_ID: {
119 sd_id128_t boot_id;
120 char sid[33];
121
122 r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id);
123 if (r < 0)
124 return log_error_errno(r, "Failed to get monotonic timestamp: %m");
125
126 r = snprintf(buf + pos, size - pos,
127 "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid));
128 if (r + pos > size)
129 /* not enough space */
130 return pos;
131
132 u->entry_state ++;
133
134 if (r + pos == size) {
135 /* exactly one character short, but we don't need it */
136 buf[size - 1] = '\n';
137 return size;
138 }
139
140 pos += r;
141 } /* fall through */
142
143 case ENTRY_NEW_FIELD: {
144 u->field_pos = 0;
145
146 r = sd_journal_enumerate_data(u->journal,
147 &u->field_data,
148 &u->field_length);
149 if (r < 0)
150 return log_error_errno(r, "Failed to move to next field in entry: %m");
151 else if (r == 0) {
152 u->entry_state = ENTRY_OUTRO;
153 continue;
154 }
155
156 if (!utf8_is_printable_newline(u->field_data,
157 u->field_length, false)) {
158 u->entry_state = ENTRY_BINARY_FIELD_START;
159 continue;
160 }
161
162 u->entry_state ++;
163 } /* fall through */
164
165 case ENTRY_TEXT_FIELD:
166 case ENTRY_BINARY_FIELD: {
167 bool done;
168 size_t tocopy;
169
170 done = size - pos > u->field_length - u->field_pos;
171 if (done)
172 tocopy = u->field_length - u->field_pos;
173 else
174 tocopy = size - pos;
175
176 memcpy(buf + pos,
177 (char*) u->field_data + u->field_pos,
178 tocopy);
179
180 if (done) {
181 buf[pos + tocopy] = '\n';
182 pos += tocopy + 1;
183 u->entry_state = ENTRY_NEW_FIELD;
184 continue;
185 } else {
186 u->field_pos += tocopy;
187 return size;
188 }
189 }
190
191 case ENTRY_BINARY_FIELD_START: {
192 const char *c;
193 size_t len;
194
195 c = memchr(u->field_data, '=', u->field_length);
196 if (!c || c == u->field_data) {
197 log_error("Invalid field.");
198 return -EINVAL;
199 }
200
201 len = c - (const char*)u->field_data;
202
203 /* need space for label + '\n' */
204 if (size - pos < len + 1)
205 return pos;
206
207 memcpy(buf + pos, u->field_data, len);
208 buf[pos + len] = '\n';
209 pos += len + 1;
210
211 u->field_pos = len + 1;
212 u->entry_state ++;
213 } /* fall through */
214
215 case ENTRY_BINARY_FIELD_SIZE: {
216 uint64_t le64;
217
218 /* need space for uint64_t */
219 if (size - pos < 8)
220 return pos;
221
222 le64 = htole64(u->field_length - u->field_pos);
223 memcpy(buf + pos, &le64, 8);
224 pos += 8;
225
226 u->entry_state ++;
227 continue;
228 }
229
230 case ENTRY_OUTRO:
231 /* need space for '\n' */
232 if (size - pos < 1)
233 return pos;
234
235 buf[pos++] = '\n';
236 u->entry_state ++;
237 u->entries_sent ++;
238
239 return pos;
240
241 default:
242 assert_not_reached("WTF?");
243 }
244 }
245 assert_not_reached("WTF?");
246 }
247
248 static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
249 Uploader *u = userp;
250 int r;
251 sd_journal *j;
252 size_t filled = 0;
253 ssize_t w;
254
255 assert(u);
256 assert(nmemb <= SSIZE_MAX / size);
257
258 j = u->journal;
259
260 while (j && filled < size * nmemb) {
261 if (u->entry_state == ENTRY_DONE) {
262 r = sd_journal_next(j);
263 if (r < 0) {
264 log_error_errno(r, "Failed to move to next entry in journal: %m");
265 return CURL_READFUNC_ABORT;
266 } else if (r == 0) {
267 if (u->input_event)
268 log_debug("No more entries, waiting for journal.");
269 else {
270 log_info("No more entries, closing journal.");
271 close_journal_input(u);
272 }
273
274 u->uploading = false;
275
276 break;
277 }
278
279 u->entry_state = ENTRY_CURSOR;
280 }
281
282 w = write_entry((char*)buf + filled, size * nmemb - filled, u);
283 if (w < 0)
284 return CURL_READFUNC_ABORT;
285 filled += w;
286
287 if (filled == 0) {
288 log_error("Buffer space is too small to write entry.");
289 return CURL_READFUNC_ABORT;
290 } else if (u->entry_state != ENTRY_DONE)
291 /* This means that all available space was used up */
292 break;
293
294 log_debug("Entry %zu (%s) has been uploaded.",
295 u->entries_sent, u->current_cursor);
296 }
297
298 return filled;
299 }
300
301 void close_journal_input(Uploader *u) {
302 assert(u);
303
304 if (u->journal) {
305 log_debug("Closing journal input.");
306
307 sd_journal_close(u->journal);
308 u->journal = NULL;
309 }
310 u->timeout = 0;
311 }
312
313 static int process_journal_input(Uploader *u, int skip) {
314 int r;
315
316 r = sd_journal_next_skip(u->journal, skip);
317 if (r < 0)
318 return log_error_errno(r, "Failed to skip to next entry: %m");
319 else if (r < skip)
320 return 0;
321
322 /* have data */
323 u->entry_state = ENTRY_CURSOR;
324 return start_upload(u, journal_input_callback, u);
325 }
326
327 int check_journal_input(Uploader *u) {
328 if (u->input_event) {
329 int r;
330
331 r = sd_journal_process(u->journal);
332 if (r < 0) {
333 log_error_errno(r, "Failed to process journal: %m");
334 close_journal_input(u);
335 return r;
336 }
337
338 if (r == SD_JOURNAL_NOP)
339 return 0;
340 }
341
342 return process_journal_input(u, 1);
343 }
344
345 static int dispatch_journal_input(sd_event_source *event,
346 int fd,
347 uint32_t revents,
348 void *userp) {
349 Uploader *u = userp;
350
351 assert(u);
352
353 if (u->uploading) {
354 log_warning("dispatch_journal_input called when uploading, ignoring.");
355 return 0;
356 }
357
358 log_debug("Detected journal input, checking for new data.");
359 return check_journal_input(u);
360 }
361
362 int open_journal_for_upload(Uploader *u,
363 sd_journal *j,
364 const char *cursor,
365 bool after_cursor,
366 bool follow) {
367 int fd, r, events;
368
369 u->journal = j;
370
371 sd_journal_set_data_threshold(j, 0);
372
373 if (follow) {
374 fd = sd_journal_get_fd(j);
375 if (fd < 0)
376 return log_error_errno(fd, "sd_journal_get_fd failed: %m");
377
378 events = sd_journal_get_events(j);
379
380 r = sd_journal_reliable_fd(j);
381 assert(r >= 0);
382 if (r > 0)
383 u->timeout = -1;
384 else
385 u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT;
386
387 r = sd_event_add_io(u->events, &u->input_event,
388 fd, events, dispatch_journal_input, u);
389 if (r < 0)
390 return log_error_errno(r, "Failed to register input event: %m");
391
392 log_debug("Listening for journal events on fd:%d, timeout %d",
393 fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout);
394 } else
395 log_debug("Not listening for journal events.");
396
397 if (cursor) {
398 r = sd_journal_seek_cursor(j, cursor);
399 if (r < 0)
400 return log_error_errno(r, "Failed to seek to cursor %s: %m",
401 cursor);
402 }
403
404 return process_journal_input(u, 1 + !!after_cursor);
405 }