]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/import/pull-job.c
importd: add new bus calls for importing local tar and raw images
[thirdparty/systemd.git] / src / import / pull-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2015 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "strv.h"
25 #include "machine-pool.h"
26 #include "pull-job.h"
27
28 PullJob* pull_job_unref(PullJob *j) {
29 if (!j)
30 return NULL;
31
32 curl_glue_remove_and_free(j->glue, j->curl);
33 curl_slist_free_all(j->request_header);
34
35 safe_close(j->disk_fd);
36
37 import_compress_free(&j->compress);
38
39 if (j->checksum_context)
40 gcry_md_close(j->checksum_context);
41
42 free(j->url);
43 free(j->etag);
44 strv_free(j->old_etags);
45 free(j->payload);
46 free(j->checksum);
47
48 free(j);
49
50 return NULL;
51 }
52
53 static void pull_job_finish(PullJob *j, int ret) {
54 assert(j);
55
56 if (j->state == PULL_JOB_DONE ||
57 j->state == PULL_JOB_FAILED)
58 return;
59
60 if (ret == 0) {
61 j->state = PULL_JOB_DONE;
62 j->progress_percent = 100;
63 log_info("Download of %s complete.", j->url);
64 } else {
65 j->state = PULL_JOB_FAILED;
66 j->error = ret;
67 }
68
69 if (j->on_finished)
70 j->on_finished(j);
71 }
72
73 void pull_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
74 PullJob *j = NULL;
75 CURLcode code;
76 long status;
77 int r;
78
79 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, &j) != CURLE_OK)
80 return;
81
82 if (!j || j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED)
83 return;
84
85 if (result != CURLE_OK) {
86 log_error("Transfer failed: %s", curl_easy_strerror(result));
87 r = -EIO;
88 goto finish;
89 }
90
91 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
92 if (code != CURLE_OK) {
93 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
94 r = -EIO;
95 goto finish;
96 } else if (status == 304) {
97 log_info("Image already downloaded. Skipping download.");
98 j->etag_exists = true;
99 r = 0;
100 goto finish;
101 } else if (status >= 300) {
102 log_error("HTTP request to %s failed with code %li.", j->url, status);
103 r = -EIO;
104 goto finish;
105 } else if (status < 200) {
106 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
107 r = -EIO;
108 goto finish;
109 }
110
111 if (j->state != PULL_JOB_RUNNING) {
112 log_error("Premature connection termination.");
113 r = -EIO;
114 goto finish;
115 }
116
117 if (j->content_length != (uint64_t) -1 &&
118 j->content_length != j->written_compressed) {
119 log_error("Download truncated.");
120 r = -EIO;
121 goto finish;
122 }
123
124 if (j->checksum_context) {
125 uint8_t *k;
126
127 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
128 if (!k) {
129 log_error("Failed to get checksum.");
130 r = -EIO;
131 goto finish;
132 }
133
134 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
135 if (!j->checksum) {
136 r = log_oom();
137 goto finish;
138 }
139
140 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
141 }
142
143 if (j->disk_fd >= 0 && j->allow_sparse) {
144 /* Make sure the file size is right, in case the file was
145 * sparse and we just seeked for the last part */
146
147 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
148 r = log_error_errno(errno, "Failed to truncate file: %m");
149 goto finish;
150 }
151
152 if (j->etag)
153 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
154 if (j->url)
155 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
156
157 if (j->mtime != 0) {
158 struct timespec ut[2];
159
160 timespec_store(&ut[0], j->mtime);
161 ut[1] = ut[0];
162 (void) futimens(j->disk_fd, ut);
163
164 (void) fd_setcrtime(j->disk_fd, j->mtime);
165 }
166 }
167
168 r = 0;
169
170 finish:
171 pull_job_finish(j, r);
172 }
173
174 static int pull_job_write_uncompressed(const void *p, size_t sz, void *userdata) {
175 PullJob *j = userdata;
176 ssize_t n;
177
178 assert(j);
179 assert(p);
180
181 if (sz <= 0)
182 return 0;
183
184 if (j->written_uncompressed + sz < j->written_uncompressed) {
185 log_error("File too large, overflow");
186 return -EOVERFLOW;
187 }
188
189 if (j->written_uncompressed + sz > j->uncompressed_max) {
190 log_error("File overly large, refusing");
191 return -EFBIG;
192 }
193
194 if (j->disk_fd >= 0) {
195
196 if (j->grow_machine_directory && j->written_since_last_grow >= GROW_INTERVAL_BYTES) {
197 j->written_since_last_grow = 0;
198 grow_machine_directory();
199 }
200
201 if (j->allow_sparse)
202 n = sparse_write(j->disk_fd, p, sz, 64);
203 else
204 n = write(j->disk_fd, p, sz);
205 if (n < 0)
206 return log_error_errno(errno, "Failed to write file: %m");
207 if ((size_t) n < sz) {
208 log_error("Short write");
209 return -EIO;
210 }
211 } else {
212
213 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
214 return log_oom();
215
216 memcpy(j->payload + j->payload_size, p, sz);
217 j->payload_size += sz;
218 }
219
220 j->written_uncompressed += sz;
221 j->written_since_last_grow += sz;
222
223 return 0;
224 }
225
226 static int pull_job_write_compressed(PullJob *j, void *p, size_t sz) {
227 int r;
228
229 assert(j);
230 assert(p);
231
232 if (sz <= 0)
233 return 0;
234
235 if (j->written_compressed + sz < j->written_compressed) {
236 log_error("File too large, overflow");
237 return -EOVERFLOW;
238 }
239
240 if (j->written_compressed + sz > j->compressed_max) {
241 log_error("File overly large, refusing.");
242 return -EFBIG;
243 }
244
245 if (j->content_length != (uint64_t) -1 &&
246 j->written_compressed + sz > j->content_length) {
247 log_error("Content length incorrect.");
248 return -EFBIG;
249 }
250
251 if (j->checksum_context)
252 gcry_md_write(j->checksum_context, p, sz);
253
254 r = import_uncompress(&j->compress, p, sz, pull_job_write_uncompressed, j);
255 if (r < 0)
256 return r;
257
258 j->written_compressed += sz;
259
260 return 0;
261 }
262
263 static int pull_job_open_disk(PullJob *j) {
264 int r;
265
266 assert(j);
267
268 if (j->on_open_disk) {
269 r = j->on_open_disk(j);
270 if (r < 0)
271 return r;
272 }
273
274 if (j->disk_fd >= 0) {
275 /* Check if we can do sparse files */
276
277 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
278 j->allow_sparse = true;
279 else {
280 if (errno != ESPIPE)
281 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
282
283 j->allow_sparse = false;
284 }
285 }
286
287 if (j->calc_checksum) {
288 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
289 log_error("Failed to initialize hash context.");
290 return -EIO;
291 }
292 }
293
294 return 0;
295 }
296
297 static int pull_job_detect_compression(PullJob *j) {
298 _cleanup_free_ uint8_t *stub = NULL;
299 size_t stub_size;
300
301 int r;
302
303 assert(j);
304
305 r = import_uncompress_detect(&j->compress, j->payload, j->payload_size);
306 if (r < 0)
307 return log_error_errno(r, "Failed to initialize compressor: %m");
308 if (r == 0)
309 return 0;
310
311 log_debug("Stream is compressed: %s", import_compress_type_to_string(j->compress.type));
312
313 r = pull_job_open_disk(j);
314 if (r < 0)
315 return r;
316
317 /* Now, take the payload we read so far, and decompress it */
318 stub = j->payload;
319 stub_size = j->payload_size;
320
321 j->payload = NULL;
322 j->payload_size = 0;
323 j->payload_allocated = 0;
324
325 j->state = PULL_JOB_RUNNING;
326
327 r = pull_job_write_compressed(j, stub, stub_size);
328 if (r < 0)
329 return r;
330
331 return 0;
332 }
333
334 static size_t pull_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
335 PullJob *j = userdata;
336 size_t sz = size * nmemb;
337 int r;
338
339 assert(contents);
340 assert(j);
341
342 switch (j->state) {
343
344 case PULL_JOB_ANALYZING:
345 /* Let's first check what it actually is */
346
347 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
348 r = log_oom();
349 goto fail;
350 }
351
352 memcpy(j->payload + j->payload_size, contents, sz);
353 j->payload_size += sz;
354
355 r = pull_job_detect_compression(j);
356 if (r < 0)
357 goto fail;
358
359 break;
360
361 case PULL_JOB_RUNNING:
362
363 r = pull_job_write_compressed(j, contents, sz);
364 if (r < 0)
365 goto fail;
366
367 break;
368
369 case PULL_JOB_DONE:
370 case PULL_JOB_FAILED:
371 r = -ESTALE;
372 goto fail;
373
374 default:
375 assert_not_reached("Impossible state.");
376 }
377
378 return sz;
379
380 fail:
381 pull_job_finish(j, r);
382 return 0;
383 }
384
385 static size_t pull_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
386 PullJob *j = userdata;
387 size_t sz = size * nmemb;
388 _cleanup_free_ char *length = NULL, *last_modified = NULL;
389 char *etag;
390 int r;
391
392 assert(contents);
393 assert(j);
394
395 if (j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED) {
396 r = -ESTALE;
397 goto fail;
398 }
399
400 assert(j->state == PULL_JOB_ANALYZING);
401
402 r = curl_header_strdup(contents, sz, "ETag:", &etag);
403 if (r < 0) {
404 log_oom();
405 goto fail;
406 }
407 if (r > 0) {
408 free(j->etag);
409 j->etag = etag;
410
411 if (strv_contains(j->old_etags, j->etag)) {
412 log_info("Image already downloaded. Skipping download.");
413 j->etag_exists = true;
414 pull_job_finish(j, 0);
415 return sz;
416 }
417
418 return sz;
419 }
420
421 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
422 if (r < 0) {
423 log_oom();
424 goto fail;
425 }
426 if (r > 0) {
427 (void) safe_atou64(length, &j->content_length);
428
429 if (j->content_length != (uint64_t) -1) {
430 char bytes[FORMAT_BYTES_MAX];
431
432 if (j->content_length > j->compressed_max) {
433 log_error("Content too large.");
434 r = -EFBIG;
435 goto fail;
436 }
437
438 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
439 }
440
441 return sz;
442 }
443
444 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
445 if (r < 0) {
446 log_oom();
447 goto fail;
448 }
449 if (r > 0) {
450 (void) curl_parse_http_time(last_modified, &j->mtime);
451 return sz;
452 }
453
454 if (j->on_header) {
455 r = j->on_header(j, contents, sz);
456 if (r < 0)
457 goto fail;
458 }
459
460 return sz;
461
462 fail:
463 pull_job_finish(j, r);
464 return 0;
465 }
466
467 static int pull_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
468 PullJob *j = userdata;
469 unsigned percent;
470 usec_t n;
471
472 assert(j);
473
474 if (dltotal <= 0)
475 return 0;
476
477 percent = ((100 * dlnow) / dltotal);
478 n = now(CLOCK_MONOTONIC);
479
480 if (n > j->last_status_usec + USEC_PER_SEC &&
481 percent != j->progress_percent &&
482 dlnow < dltotal) {
483 char buf[FORMAT_TIMESPAN_MAX];
484
485 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
486 char y[FORMAT_BYTES_MAX];
487 usec_t left, done;
488
489 done = n - j->start_usec;
490 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
491
492 log_info("Got %u%% of %s. %s left at %s/s.",
493 percent,
494 j->url,
495 format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
496 format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
497 } else
498 log_info("Got %u%% of %s.", percent, j->url);
499
500 j->progress_percent = percent;
501 j->last_status_usec = n;
502
503 if (j->on_progress)
504 j->on_progress(j);
505 }
506
507 return 0;
508 }
509
510 int pull_job_new(PullJob **ret, const char *url, CurlGlue *glue, void *userdata) {
511 _cleanup_(pull_job_unrefp) PullJob *j = NULL;
512
513 assert(url);
514 assert(glue);
515 assert(ret);
516
517 j = new0(PullJob, 1);
518 if (!j)
519 return -ENOMEM;
520
521 j->state = PULL_JOB_INIT;
522 j->disk_fd = -1;
523 j->userdata = userdata;
524 j->glue = glue;
525 j->content_length = (uint64_t) -1;
526 j->start_usec = now(CLOCK_MONOTONIC);
527 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
528
529 j->url = strdup(url);
530 if (!j->url)
531 return -ENOMEM;
532
533 *ret = j;
534 j = NULL;
535
536 return 0;
537 }
538
539 int pull_job_begin(PullJob *j) {
540 int r;
541
542 assert(j);
543
544 if (j->state != PULL_JOB_INIT)
545 return -EBUSY;
546
547 if (j->grow_machine_directory)
548 grow_machine_directory();
549
550 r = curl_glue_make(&j->curl, j->url, j);
551 if (r < 0)
552 return r;
553
554 if (!strv_isempty(j->old_etags)) {
555 _cleanup_free_ char *cc = NULL, *hdr = NULL;
556
557 cc = strv_join(j->old_etags, ", ");
558 if (!cc)
559 return -ENOMEM;
560
561 hdr = strappend("If-None-Match: ", cc);
562 if (!hdr)
563 return -ENOMEM;
564
565 if (!j->request_header) {
566 j->request_header = curl_slist_new(hdr, NULL);
567 if (!j->request_header)
568 return -ENOMEM;
569 } else {
570 struct curl_slist *l;
571
572 l = curl_slist_append(j->request_header, hdr);
573 if (!l)
574 return -ENOMEM;
575
576 j->request_header = l;
577 }
578 }
579
580 if (j->request_header) {
581 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
582 return -EIO;
583 }
584
585 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, pull_job_write_callback) != CURLE_OK)
586 return -EIO;
587
588 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
589 return -EIO;
590
591 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, pull_job_header_callback) != CURLE_OK)
592 return -EIO;
593
594 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
595 return -EIO;
596
597 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, pull_job_progress_callback) != CURLE_OK)
598 return -EIO;
599
600 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
601 return -EIO;
602
603 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
604 return -EIO;
605
606 r = curl_glue_add(j->glue, j->curl);
607 if (r < 0)
608 return r;
609
610 j->state = PULL_JOB_ANALYZING;
611
612 return 0;
613 }