]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/import/pull-job.c
fd058777050da66b5fa538b0f7b10056d6e685ec
[thirdparty/systemd.git] / src / import / pull-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2015 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "fd-util.h"
25 #include "io-util.h"
26 #include "machine-pool.h"
27 #include "parse-util.h"
28 #include "pull-job.h"
29 #include "string-util.h"
30 #include "strv.h"
31
32 PullJob* pull_job_unref(PullJob *j) {
33 if (!j)
34 return NULL;
35
36 curl_glue_remove_and_free(j->glue, j->curl);
37 curl_slist_free_all(j->request_header);
38
39 safe_close(j->disk_fd);
40
41 import_compress_free(&j->compress);
42
43 if (j->checksum_context)
44 gcry_md_close(j->checksum_context);
45
46 free(j->url);
47 free(j->etag);
48 strv_free(j->old_etags);
49 free(j->payload);
50 free(j->checksum);
51
52 free(j);
53
54 return NULL;
55 }
56
57 static void pull_job_finish(PullJob *j, int ret) {
58 assert(j);
59
60 if (j->state == PULL_JOB_DONE ||
61 j->state == PULL_JOB_FAILED)
62 return;
63
64 if (ret == 0) {
65 j->state = PULL_JOB_DONE;
66 j->progress_percent = 100;
67 log_info("Download of %s complete.", j->url);
68 } else {
69 j->state = PULL_JOB_FAILED;
70 j->error = ret;
71 }
72
73 if (j->on_finished)
74 j->on_finished(j);
75 }
76
77 void pull_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
78 PullJob *j = NULL;
79 CURLcode code;
80 long status;
81 int r;
82
83 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, (char **)&j) != CURLE_OK)
84 return;
85
86 if (!j || j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED)
87 return;
88
89 if (result != CURLE_OK) {
90 log_error("Transfer failed: %s", curl_easy_strerror(result));
91 r = -EIO;
92 goto finish;
93 }
94
95 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
96 if (code != CURLE_OK) {
97 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
98 r = -EIO;
99 goto finish;
100 } else if (status == 304) {
101 log_info("Image already downloaded. Skipping download.");
102 j->etag_exists = true;
103 r = 0;
104 goto finish;
105 } else if (status >= 300) {
106 log_error("HTTP request to %s failed with code %li.", j->url, status);
107 r = -EIO;
108 goto finish;
109 } else if (status < 200) {
110 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
111 r = -EIO;
112 goto finish;
113 }
114
115 if (j->state != PULL_JOB_RUNNING) {
116 log_error("Premature connection termination.");
117 r = -EIO;
118 goto finish;
119 }
120
121 if (j->content_length != (uint64_t) -1 &&
122 j->content_length != j->written_compressed) {
123 log_error("Download truncated.");
124 r = -EIO;
125 goto finish;
126 }
127
128 if (j->checksum_context) {
129 uint8_t *k;
130
131 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
132 if (!k) {
133 log_error("Failed to get checksum.");
134 r = -EIO;
135 goto finish;
136 }
137
138 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
139 if (!j->checksum) {
140 r = log_oom();
141 goto finish;
142 }
143
144 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
145 }
146
147 if (j->disk_fd >= 0 && j->allow_sparse) {
148 /* Make sure the file size is right, in case the file was
149 * sparse and we just seeked for the last part */
150
151 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
152 r = log_error_errno(errno, "Failed to truncate file: %m");
153 goto finish;
154 }
155
156 if (j->etag)
157 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
158 if (j->url)
159 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
160
161 if (j->mtime != 0) {
162 struct timespec ut[2];
163
164 timespec_store(&ut[0], j->mtime);
165 ut[1] = ut[0];
166 (void) futimens(j->disk_fd, ut);
167
168 (void) fd_setcrtime(j->disk_fd, j->mtime);
169 }
170 }
171
172 r = 0;
173
174 finish:
175 pull_job_finish(j, r);
176 }
177
178 static int pull_job_write_uncompressed(const void *p, size_t sz, void *userdata) {
179 PullJob *j = userdata;
180 ssize_t n;
181
182 assert(j);
183 assert(p);
184
185 if (sz <= 0)
186 return 0;
187
188 if (j->written_uncompressed + sz < j->written_uncompressed) {
189 log_error("File too large, overflow");
190 return -EOVERFLOW;
191 }
192
193 if (j->written_uncompressed + sz > j->uncompressed_max) {
194 log_error("File overly large, refusing");
195 return -EFBIG;
196 }
197
198 if (j->disk_fd >= 0) {
199
200 if (j->grow_machine_directory && j->written_since_last_grow >= GROW_INTERVAL_BYTES) {
201 j->written_since_last_grow = 0;
202 grow_machine_directory();
203 }
204
205 if (j->allow_sparse)
206 n = sparse_write(j->disk_fd, p, sz, 64);
207 else
208 n = write(j->disk_fd, p, sz);
209 if (n < 0)
210 return log_error_errno(errno, "Failed to write file: %m");
211 if ((size_t) n < sz) {
212 log_error("Short write");
213 return -EIO;
214 }
215 } else {
216
217 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
218 return log_oom();
219
220 memcpy(j->payload + j->payload_size, p, sz);
221 j->payload_size += sz;
222 }
223
224 j->written_uncompressed += sz;
225 j->written_since_last_grow += sz;
226
227 return 0;
228 }
229
230 static int pull_job_write_compressed(PullJob *j, void *p, size_t sz) {
231 int r;
232
233 assert(j);
234 assert(p);
235
236 if (sz <= 0)
237 return 0;
238
239 if (j->written_compressed + sz < j->written_compressed) {
240 log_error("File too large, overflow");
241 return -EOVERFLOW;
242 }
243
244 if (j->written_compressed + sz > j->compressed_max) {
245 log_error("File overly large, refusing.");
246 return -EFBIG;
247 }
248
249 if (j->content_length != (uint64_t) -1 &&
250 j->written_compressed + sz > j->content_length) {
251 log_error("Content length incorrect.");
252 return -EFBIG;
253 }
254
255 if (j->checksum_context)
256 gcry_md_write(j->checksum_context, p, sz);
257
258 r = import_uncompress(&j->compress, p, sz, pull_job_write_uncompressed, j);
259 if (r < 0)
260 return r;
261
262 j->written_compressed += sz;
263
264 return 0;
265 }
266
267 static int pull_job_open_disk(PullJob *j) {
268 int r;
269
270 assert(j);
271
272 if (j->on_open_disk) {
273 r = j->on_open_disk(j);
274 if (r < 0)
275 return r;
276 }
277
278 if (j->disk_fd >= 0) {
279 /* Check if we can do sparse files */
280
281 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
282 j->allow_sparse = true;
283 else {
284 if (errno != ESPIPE)
285 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
286
287 j->allow_sparse = false;
288 }
289 }
290
291 if (j->calc_checksum) {
292 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
293 log_error("Failed to initialize hash context.");
294 return -EIO;
295 }
296 }
297
298 return 0;
299 }
300
301 static int pull_job_detect_compression(PullJob *j) {
302 _cleanup_free_ uint8_t *stub = NULL;
303 size_t stub_size;
304
305 int r;
306
307 assert(j);
308
309 r = import_uncompress_detect(&j->compress, j->payload, j->payload_size);
310 if (r < 0)
311 return log_error_errno(r, "Failed to initialize compressor: %m");
312 if (r == 0)
313 return 0;
314
315 log_debug("Stream is compressed: %s", import_compress_type_to_string(j->compress.type));
316
317 r = pull_job_open_disk(j);
318 if (r < 0)
319 return r;
320
321 /* Now, take the payload we read so far, and decompress it */
322 stub = j->payload;
323 stub_size = j->payload_size;
324
325 j->payload = NULL;
326 j->payload_size = 0;
327 j->payload_allocated = 0;
328
329 j->state = PULL_JOB_RUNNING;
330
331 r = pull_job_write_compressed(j, stub, stub_size);
332 if (r < 0)
333 return r;
334
335 return 0;
336 }
337
338 static size_t pull_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
339 PullJob *j = userdata;
340 size_t sz = size * nmemb;
341 int r;
342
343 assert(contents);
344 assert(j);
345
346 switch (j->state) {
347
348 case PULL_JOB_ANALYZING:
349 /* Let's first check what it actually is */
350
351 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
352 r = log_oom();
353 goto fail;
354 }
355
356 memcpy(j->payload + j->payload_size, contents, sz);
357 j->payload_size += sz;
358
359 r = pull_job_detect_compression(j);
360 if (r < 0)
361 goto fail;
362
363 break;
364
365 case PULL_JOB_RUNNING:
366
367 r = pull_job_write_compressed(j, contents, sz);
368 if (r < 0)
369 goto fail;
370
371 break;
372
373 case PULL_JOB_DONE:
374 case PULL_JOB_FAILED:
375 r = -ESTALE;
376 goto fail;
377
378 default:
379 assert_not_reached("Impossible state.");
380 }
381
382 return sz;
383
384 fail:
385 pull_job_finish(j, r);
386 return 0;
387 }
388
389 static size_t pull_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
390 PullJob *j = userdata;
391 size_t sz = size * nmemb;
392 _cleanup_free_ char *length = NULL, *last_modified = NULL;
393 char *etag;
394 int r;
395
396 assert(contents);
397 assert(j);
398
399 if (j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED) {
400 r = -ESTALE;
401 goto fail;
402 }
403
404 assert(j->state == PULL_JOB_ANALYZING);
405
406 r = curl_header_strdup(contents, sz, "ETag:", &etag);
407 if (r < 0) {
408 log_oom();
409 goto fail;
410 }
411 if (r > 0) {
412 free(j->etag);
413 j->etag = etag;
414
415 if (strv_contains(j->old_etags, j->etag)) {
416 log_info("Image already downloaded. Skipping download.");
417 j->etag_exists = true;
418 pull_job_finish(j, 0);
419 return sz;
420 }
421
422 return sz;
423 }
424
425 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
426 if (r < 0) {
427 log_oom();
428 goto fail;
429 }
430 if (r > 0) {
431 (void) safe_atou64(length, &j->content_length);
432
433 if (j->content_length != (uint64_t) -1) {
434 char bytes[FORMAT_BYTES_MAX];
435
436 if (j->content_length > j->compressed_max) {
437 log_error("Content too large.");
438 r = -EFBIG;
439 goto fail;
440 }
441
442 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
443 }
444
445 return sz;
446 }
447
448 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
449 if (r < 0) {
450 log_oom();
451 goto fail;
452 }
453 if (r > 0) {
454 (void) curl_parse_http_time(last_modified, &j->mtime);
455 return sz;
456 }
457
458 if (j->on_header) {
459 r = j->on_header(j, contents, sz);
460 if (r < 0)
461 goto fail;
462 }
463
464 return sz;
465
466 fail:
467 pull_job_finish(j, r);
468 return 0;
469 }
470
471 static int pull_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
472 PullJob *j = userdata;
473 unsigned percent;
474 usec_t n;
475
476 assert(j);
477
478 if (dltotal <= 0)
479 return 0;
480
481 percent = ((100 * dlnow) / dltotal);
482 n = now(CLOCK_MONOTONIC);
483
484 if (n > j->last_status_usec + USEC_PER_SEC &&
485 percent != j->progress_percent &&
486 dlnow < dltotal) {
487 char buf[FORMAT_TIMESPAN_MAX];
488
489 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
490 char y[FORMAT_BYTES_MAX];
491 usec_t left, done;
492
493 done = n - j->start_usec;
494 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
495
496 log_info("Got %u%% of %s. %s left at %s/s.",
497 percent,
498 j->url,
499 format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
500 format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
501 } else
502 log_info("Got %u%% of %s.", percent, j->url);
503
504 j->progress_percent = percent;
505 j->last_status_usec = n;
506
507 if (j->on_progress)
508 j->on_progress(j);
509 }
510
511 return 0;
512 }
513
514 int pull_job_new(PullJob **ret, const char *url, CurlGlue *glue, void *userdata) {
515 _cleanup_(pull_job_unrefp) PullJob *j = NULL;
516
517 assert(url);
518 assert(glue);
519 assert(ret);
520
521 j = new0(PullJob, 1);
522 if (!j)
523 return -ENOMEM;
524
525 j->state = PULL_JOB_INIT;
526 j->disk_fd = -1;
527 j->userdata = userdata;
528 j->glue = glue;
529 j->content_length = (uint64_t) -1;
530 j->start_usec = now(CLOCK_MONOTONIC);
531 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
532
533 j->url = strdup(url);
534 if (!j->url)
535 return -ENOMEM;
536
537 *ret = j;
538 j = NULL;
539
540 return 0;
541 }
542
543 int pull_job_begin(PullJob *j) {
544 int r;
545
546 assert(j);
547
548 if (j->state != PULL_JOB_INIT)
549 return -EBUSY;
550
551 if (j->grow_machine_directory)
552 grow_machine_directory();
553
554 r = curl_glue_make(&j->curl, j->url, j);
555 if (r < 0)
556 return r;
557
558 if (!strv_isempty(j->old_etags)) {
559 _cleanup_free_ char *cc = NULL, *hdr = NULL;
560
561 cc = strv_join(j->old_etags, ", ");
562 if (!cc)
563 return -ENOMEM;
564
565 hdr = strappend("If-None-Match: ", cc);
566 if (!hdr)
567 return -ENOMEM;
568
569 if (!j->request_header) {
570 j->request_header = curl_slist_new(hdr, NULL);
571 if (!j->request_header)
572 return -ENOMEM;
573 } else {
574 struct curl_slist *l;
575
576 l = curl_slist_append(j->request_header, hdr);
577 if (!l)
578 return -ENOMEM;
579
580 j->request_header = l;
581 }
582 }
583
584 if (j->request_header) {
585 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
586 return -EIO;
587 }
588
589 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, pull_job_write_callback) != CURLE_OK)
590 return -EIO;
591
592 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
593 return -EIO;
594
595 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, pull_job_header_callback) != CURLE_OK)
596 return -EIO;
597
598 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
599 return -EIO;
600
601 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, pull_job_progress_callback) != CURLE_OK)
602 return -EIO;
603
604 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
605 return -EIO;
606
607 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
608 return -EIO;
609
610 r = curl_glue_add(j->glue, j->curl);
611 if (r < 0)
612 return r;
613
614 j->state = PULL_JOB_ANALYZING;
615
616 return 0;
617 }