]> git.ipfire.org Git - thirdparty/systemd.git/blame - src/import/import-job.c
import: be less aggressive when allocating memory for downloaded payload
[thirdparty/systemd.git] / src / import / import-job.c
CommitLineData
56ebfaf1
LP
1/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3/***
4 This file is part of systemd.
5
6 Copyright 2015 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20***/
21
22#include <sys/xattr.h>
23
24#include "strv.h"
25#include "import-job.h"
26
27ImportJob* import_job_unref(ImportJob *j) {
28 if (!j)
29 return NULL;
30
31 curl_glue_remove_and_free(j->glue, j->curl);
32 curl_slist_free_all(j->request_header);
33
34 safe_close(j->disk_fd);
35
36 if (j->compressed == IMPORT_JOB_XZ)
37 lzma_end(&j->xz);
38 else if (j->compressed == IMPORT_JOB_GZIP)
39 inflateEnd(&j->gzip);
40
41 free(j->url);
42 free(j->etag);
43 strv_free(j->old_etags);
44 free(j->payload);
45
46 free(j);
47
48 return NULL;
49}
50
51DEFINE_TRIVIAL_CLEANUP_FUNC(ImportJob*, import_job_unref);
52
53static void import_job_finish(ImportJob *j, int ret) {
54 assert(j);
55
56 if (j->state == IMPORT_JOB_DONE ||
57 j->state == IMPORT_JOB_FAILED)
58 return;
59
68c913fd 60 if (ret == 0) {
56ebfaf1 61 j->state = IMPORT_JOB_DONE;
68c913fd
LP
62 log_info("Download of %s complete.", j->url);
63 } else {
56ebfaf1
LP
64 j->state = IMPORT_JOB_FAILED;
65 j->error = ret;
66 }
67
68 if (j->on_finished)
69 j->on_finished(j);
70}
71
72void import_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
73 ImportJob *j = NULL;
74 CURLcode code;
75 long status;
76 int r;
77
78 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
79 return;
80
81 if (!j || j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED)
82 return;
83
84 if (result != CURLE_OK) {
85 log_error("Transfer failed: %s", curl_easy_strerror(result));
86 r = -EIO;
87 goto finish;
88 }
89
90 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
91 if (code != CURLE_OK) {
92 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
93 r = -EIO;
94 goto finish;
95 } else if (status == 304) {
96 log_info("Image already downloaded. Skipping download.");
97 r = 0;
98 goto finish;
99 } else if (status >= 300) {
100 log_error("HTTP request to %s failed with code %li.", j->url, status);
101 r = -EIO;
102 goto finish;
103 } else if (status < 200) {
104 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
105 r = -EIO;
106 goto finish;
107 }
108
109 if (j->state != IMPORT_JOB_RUNNING) {
110 log_error("Premature connection termination.");
111 r = -EIO;
112 goto finish;
113 }
114
115 if (j->content_length != (uint64_t) -1 &&
116 j->content_length != j->written_compressed) {
117 log_error("Download truncated.");
118 r = -EIO;
119 goto finish;
120 }
121
122 if (j->disk_fd >= 0 && j->allow_sparse) {
123 /* Make sure the file size is right, in case the file was
124 * sparse and we just seeked for the last part */
125
126 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
127 log_error_errno(errno, "Failed to truncate file: %m");
128 r = -errno;
129 goto finish;
130 }
131
132 if (j->etag)
133 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
134 if (j->url)
135 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
136
137 if (j->mtime != 0) {
138 struct timespec ut[2];
139
140 timespec_store(&ut[0], j->mtime);
141 ut[1] = ut[0];
142 (void) futimens(j->disk_fd, ut);
143
144 (void) fd_setcrtime(j->disk_fd, j->mtime);
145 }
146 }
147
148 r = 0;
149
150finish:
151 import_job_finish(j, r);
152}
153
154
155static int import_job_write_uncompressed(ImportJob *j, void *p, size_t sz) {
156 ssize_t n;
157
158 assert(j);
159 assert(p);
160 assert(sz > 0);
161 assert(j->disk_fd >= 0);
162
163 if (j->written_uncompressed + sz < j->written_uncompressed) {
164 log_error("File too large, overflow");
165 return -EOVERFLOW;
166 }
167
168 if (j->written_uncompressed + sz > j->uncompressed_max) {
169 log_error("File overly large, refusing");
170 return -EFBIG;
171 }
172
173 if (j->disk_fd >= 0) {
174
175 if (j->allow_sparse)
176 n = sparse_write(j->disk_fd, p, sz, 64);
177 else
178 n = write(j->disk_fd, p, sz);
179 if (n < 0) {
180 log_error_errno(errno, "Failed to write file: %m");
181 return -errno;
182 }
183 if ((size_t) n < sz) {
184 log_error("Short write");
185 return -EIO;
186 }
187 } else {
188
189 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
190 return log_oom();
191
192 memcpy((uint8_t*) j->payload + j->payload_size, p, sz);
193 j->payload_size += sz;
194 }
195
196 j->written_uncompressed += sz;
197
198 return 0;
199}
200
201static int import_job_write_compressed(ImportJob *j, void *p, size_t sz) {
202 int r;
203
204 assert(j);
205 assert(p);
206 assert(sz > 0);
207 assert(j->disk_fd >= 0);
208
209 if (j->written_compressed + sz < j->written_compressed) {
210 log_error("File too large, overflow");
211 return -EOVERFLOW;
212 }
213
214 if (j->written_compressed + sz > j->compressed_max) {
215 log_error("File overly large, refusing.");
216 return -EFBIG;
217 }
218
219 if (j->content_length != (uint64_t) -1 &&
220 j->written_compressed + sz > j->content_length) {
221 log_error("Content length incorrect.");
222 return -EFBIG;
223 }
224
225 switch (j->compressed) {
226
227 case IMPORT_JOB_UNCOMPRESSED:
228 r = import_job_write_uncompressed(j, p, sz);
229 if (r < 0)
230 return r;
231
232 break;
233
234 case IMPORT_JOB_XZ:
235 j->xz.next_in = p;
236 j->xz.avail_in = sz;
237
238 while (j->xz.avail_in > 0) {
239 uint8_t buffer[16 * 1024];
240 lzma_ret lzr;
241
242 j->xz.next_out = buffer;
243 j->xz.avail_out = sizeof(buffer);
244
245 lzr = lzma_code(&j->xz, LZMA_RUN);
246 if (lzr != LZMA_OK && lzr != LZMA_STREAM_END) {
247 log_error("Decompression error.");
248 return -EIO;
249 }
250
251 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->xz.avail_out);
252 if (r < 0)
253 return r;
254 }
255
256 break;
257
258 case IMPORT_JOB_GZIP:
259 j->gzip.next_in = p;
260 j->gzip.avail_in = sz;
261
262 while (j->gzip.avail_in > 0) {
263 uint8_t buffer[16 * 1024];
264
265 j->gzip.next_out = buffer;
266 j->gzip.avail_out = sizeof(buffer);
267
268 r = inflate(&j->gzip, Z_NO_FLUSH);
269 if (r != Z_OK && r != Z_STREAM_END) {
270 log_error("Decompression error.");
271 return -EIO;
272 }
273
274 r = import_job_write_uncompressed(j, buffer, sizeof(buffer) - j->gzip.avail_out);
275 if (r < 0)
276 return r;
277 }
278
279 break;
280
281 default:
282 assert_not_reached("Unknown compression");
283 }
284
285 j->written_compressed += sz;
286
287 return 0;
288}
289
290static int import_job_open_disk(ImportJob *j) {
291 int r;
292
293 assert(j);
294
295 if (j->on_open_disk) {
296 r = j->on_open_disk(j);
297 if (r < 0)
298 return r;
299 }
300
301 if (j->disk_fd >= 0) {
302 /* Check if we can do sparse files */
303
304 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
305 j->allow_sparse = true;
306 else {
307 if (errno != ESPIPE)
308 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
309
310 j->allow_sparse = false;
311 }
312 }
313
314 return 0;
315}
316
317static int import_job_detect_compression(ImportJob *j) {
318 static const uint8_t xz_signature[] = {
319 0xfd, '7', 'z', 'X', 'Z', 0x00
320 };
321 static const uint8_t gzip_signature[] = {
322 0x1f, 0x8b
323 };
324
325 _cleanup_free_ uint8_t *stub = NULL;
326 size_t stub_size;
327
328 int r;
329
330 assert(j);
331
332 if (j->payload_size < MAX(sizeof(xz_signature), sizeof(gzip_signature)))
333 return 0;
334
335 if (memcmp(j->payload, xz_signature, sizeof(xz_signature)) == 0)
336 j->compressed = IMPORT_JOB_XZ;
337 else if (memcmp(j->payload, gzip_signature, sizeof(gzip_signature)) == 0)
338 j->compressed = IMPORT_JOB_GZIP;
339 else
340 j->compressed = IMPORT_JOB_UNCOMPRESSED;
341
342 log_debug("Stream is XZ compressed: %s", yes_no(j->compressed == IMPORT_JOB_XZ));
343 log_debug("Stream is GZIP compressed: %s", yes_no(j->compressed == IMPORT_JOB_GZIP));
344
345 if (j->compressed == IMPORT_JOB_XZ) {
346 lzma_ret xzr;
347
348 xzr = lzma_stream_decoder(&j->xz, UINT64_MAX, LZMA_TELL_UNSUPPORTED_CHECK);
349 if (xzr != LZMA_OK) {
350 log_error("Failed to initialize XZ decoder.");
351 return -EIO;
352 }
353 }
354 if (j->compressed == IMPORT_JOB_GZIP) {
355 r = inflateInit2(&j->gzip, 15+16);
356 if (r != Z_OK) {
357 log_error("Failed to initialize gzip decoder.");
358 return -EIO;
359 }
360 }
361
362 r = import_job_open_disk(j);
363 if (r < 0)
364 return r;
365
366 /* Now, take the payload we read so far, and decompress it */
367 stub = j->payload;
368 stub_size = j->payload_size;
369
370 j->payload = NULL;
371 j->payload_size = 0;
88a1aadc 372 j->payload_allocated = 0;
56ebfaf1
LP
373
374 j->state = IMPORT_JOB_RUNNING;
375
376 r = import_job_write_compressed(j, stub, stub_size);
377 if (r < 0)
378 return r;
379
380 return 0;
381}
382
383static size_t import_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
384 ImportJob *j = userdata;
385 size_t sz = size * nmemb;
386 int r;
387
388 assert(contents);
389 assert(j);
390
391 switch (j->state) {
392
393 case IMPORT_JOB_ANALYZING:
394 /* Let's first check what it actually is */
395
396 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
397 r = log_oom();
398 goto fail;
399 }
400
401 memcpy((uint8_t*) j->payload + j->payload_size, contents, sz);
402 j->payload_size += sz;
403
404 r = import_job_detect_compression(j);
405 if (r < 0)
406 goto fail;
407
408 break;
409
410 case IMPORT_JOB_RUNNING:
411
412 r = import_job_write_compressed(j, contents, sz);
413 if (r < 0)
414 goto fail;
415
416 break;
417
418 case IMPORT_JOB_DONE:
419 case IMPORT_JOB_FAILED:
420 r = -ESTALE;
421 goto fail;
422
423 default:
424 assert_not_reached("Impossible state.");
425 }
426
427 return sz;
428
429fail:
430 import_job_finish(j, r);
431 return 0;
432}
433
434static size_t import_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
435 ImportJob *j = userdata;
436 size_t sz = size * nmemb;
437 _cleanup_free_ char *length = NULL, *last_modified = NULL;
438 char *etag;
439 int r;
440
441 assert(contents);
442 assert(j);
443
444 if (j->state == IMPORT_JOB_DONE || j->state == IMPORT_JOB_FAILED) {
445 r = -ESTALE;
446 goto fail;
447 }
448
449 assert(j->state == IMPORT_JOB_ANALYZING);
450
451 r = curl_header_strdup(contents, sz, "ETag:", &etag);
452 if (r < 0) {
453 log_oom();
454 goto fail;
455 }
456 if (r > 0) {
457 free(j->etag);
458 j->etag = etag;
459
460 if (strv_contains(j->old_etags, j->etag)) {
461 log_info("Image already downloaded. Skipping download.");
462 import_job_finish(j, 0);
463 return sz;
464 }
465
466 return sz;
467 }
468
469 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
470 if (r < 0) {
471 log_oom();
472 goto fail;
473 }
474 if (r > 0) {
475 (void) safe_atou64(length, &j->content_length);
476
477 if (j->content_length != (uint64_t) -1) {
478 char bytes[FORMAT_BYTES_MAX];
479
480 if (j->content_length > j->compressed_max) {
481 log_error("Content too large.");
482 r = -EFBIG;
483 goto fail;
484 }
485
68c913fd 486 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
56ebfaf1
LP
487 }
488
489 return sz;
490 }
491
492 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
493 if (r < 0) {
494 log_oom();
495 goto fail;
496 }
497 if (r > 0) {
498 (void) curl_parse_http_time(last_modified, &j->mtime);
499 return sz;
500 }
501
502 return sz;
503
504fail:
505 import_job_finish(j, r);
506 return 0;
507}
508
509static int import_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
510 ImportJob *j = userdata;
511 unsigned percent;
512 usec_t n;
513
514 assert(j);
515
516 if (dltotal <= 0)
517 return 0;
518
519 percent = ((100 * dlnow) / dltotal);
520 n = now(CLOCK_MONOTONIC);
521
522 if (n > j->last_status_usec + USEC_PER_SEC &&
68c913fd
LP
523 percent != j->progress_percent &&
524 dlnow < dltotal) {
56ebfaf1
LP
525 char buf[FORMAT_TIMESPAN_MAX];
526
527 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
528 usec_t left, done;
529
530 done = n - j->start_usec;
531 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
532
533 log_info("Got %u%% of %s. %s left.", percent, j->url, format_timespan(buf, sizeof(buf), left, USEC_PER_SEC));
534 } else
535 log_info("Got %u%% of %s.", percent, j->url);
536
537 j->progress_percent = percent;
538 j->last_status_usec = n;
539 }
540
541 return 0;
542}
543
544int import_job_new(ImportJob **ret, const char *url, CurlGlue *glue, void *userdata) {
545 _cleanup_(import_job_unrefp) ImportJob *j = NULL;
546
547 assert(url);
548 assert(glue);
549 assert(ret);
550
551 j = new0(ImportJob, 1);
552 if (!j)
553 return -ENOMEM;
554
555 j->state = IMPORT_JOB_INIT;
556 j->disk_fd = -1;
557 j->userdata = userdata;
558 j->glue = glue;
559 j->content_length = (uint64_t) -1;
560 j->start_usec = now(CLOCK_MONOTONIC);
561 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
562
563 j->url = strdup(url);
564 if (!j->url)
565 return -ENOMEM;
566
567 *ret = j;
568 j = NULL;
569
570 return 0;
571}
572
573int import_job_begin(ImportJob *j) {
574 int r;
575
576 assert(j);
577
578 if (j->state != IMPORT_JOB_INIT)
579 return -EBUSY;
580
581 r = curl_glue_make(&j->curl, j->url, j);
582 if (r < 0)
583 return r;
584
585 if (!strv_isempty(j->old_etags)) {
586 _cleanup_free_ char *cc = NULL, *hdr = NULL;
587
588 cc = strv_join(j->old_etags, ", ");
589 if (!cc)
590 return -ENOMEM;
591
592 hdr = strappend("If-None-Match: ", cc);
593 if (!hdr)
594 return -ENOMEM;
595
596 j->request_header = curl_slist_new(hdr, NULL);
597 if (!j->request_header)
598 return -ENOMEM;
599
600 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
601 return -EIO;
602 }
603
604 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, import_job_write_callback) != CURLE_OK)
605 return -EIO;
606
607 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
608 return -EIO;
609
610 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, import_job_header_callback) != CURLE_OK)
611 return -EIO;
612
613 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
614 return -EIO;
615
616 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, import_job_progress_callback) != CURLE_OK)
617 return -EIO;
618
619 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
620 return -EIO;
621
622 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
623 return -EIO;
624
625 r = curl_glue_add(j->glue, j->curl);
626 if (r < 0)
627 return r;
628
629 j->state = IMPORT_JOB_ANALYZING;
630
631 return 0;
632}