]> git.ipfire.org Git - thirdparty/systemd.git/blob - src/import/pull-job.c
4736306de22c0ecf425096a8635f7b8f44a8be50
[thirdparty/systemd.git] / src / import / pull-job.c
1 /*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
2
3 /***
4 This file is part of systemd.
5
6 Copyright 2015 Lennart Poettering
7
8 systemd is free software; you can redistribute it and/or modify it
9 under the terms of the GNU Lesser General Public License as published by
10 the Free Software Foundation; either version 2.1 of the License, or
11 (at your option) any later version.
12
13 systemd is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 Lesser General Public License for more details.
17
18 You should have received a copy of the GNU Lesser General Public License
19 along with systemd; If not, see <http://www.gnu.org/licenses/>.
20 ***/
21
22 #include <sys/xattr.h>
23
24 #include "fd-util.h"
25 #include "io-util.h"
26 #include "machine-pool.h"
27 #include "pull-job.h"
28 #include "string-util.h"
29 #include "strv.h"
30
31 PullJob* pull_job_unref(PullJob *j) {
32 if (!j)
33 return NULL;
34
35 curl_glue_remove_and_free(j->glue, j->curl);
36 curl_slist_free_all(j->request_header);
37
38 safe_close(j->disk_fd);
39
40 import_compress_free(&j->compress);
41
42 if (j->checksum_context)
43 gcry_md_close(j->checksum_context);
44
45 free(j->url);
46 free(j->etag);
47 strv_free(j->old_etags);
48 free(j->payload);
49 free(j->checksum);
50
51 free(j);
52
53 return NULL;
54 }
55
56 static void pull_job_finish(PullJob *j, int ret) {
57 assert(j);
58
59 if (j->state == PULL_JOB_DONE ||
60 j->state == PULL_JOB_FAILED)
61 return;
62
63 if (ret == 0) {
64 j->state = PULL_JOB_DONE;
65 j->progress_percent = 100;
66 log_info("Download of %s complete.", j->url);
67 } else {
68 j->state = PULL_JOB_FAILED;
69 j->error = ret;
70 }
71
72 if (j->on_finished)
73 j->on_finished(j);
74 }
75
76 void pull_job_curl_on_finished(CurlGlue *g, CURL *curl, CURLcode result) {
77 PullJob *j = NULL;
78 CURLcode code;
79 long status;
80 int r;
81
82 if (curl_easy_getinfo(curl, CURLINFO_PRIVATE, (char **)&j) != CURLE_OK)
83 return;
84
85 if (!j || j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED)
86 return;
87
88 if (result != CURLE_OK) {
89 log_error("Transfer failed: %s", curl_easy_strerror(result));
90 r = -EIO;
91 goto finish;
92 }
93
94 code = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
95 if (code != CURLE_OK) {
96 log_error("Failed to retrieve response code: %s", curl_easy_strerror(code));
97 r = -EIO;
98 goto finish;
99 } else if (status == 304) {
100 log_info("Image already downloaded. Skipping download.");
101 j->etag_exists = true;
102 r = 0;
103 goto finish;
104 } else if (status >= 300) {
105 log_error("HTTP request to %s failed with code %li.", j->url, status);
106 r = -EIO;
107 goto finish;
108 } else if (status < 200) {
109 log_error("HTTP request to %s finished with unexpected code %li.", j->url, status);
110 r = -EIO;
111 goto finish;
112 }
113
114 if (j->state != PULL_JOB_RUNNING) {
115 log_error("Premature connection termination.");
116 r = -EIO;
117 goto finish;
118 }
119
120 if (j->content_length != (uint64_t) -1 &&
121 j->content_length != j->written_compressed) {
122 log_error("Download truncated.");
123 r = -EIO;
124 goto finish;
125 }
126
127 if (j->checksum_context) {
128 uint8_t *k;
129
130 k = gcry_md_read(j->checksum_context, GCRY_MD_SHA256);
131 if (!k) {
132 log_error("Failed to get checksum.");
133 r = -EIO;
134 goto finish;
135 }
136
137 j->checksum = hexmem(k, gcry_md_get_algo_dlen(GCRY_MD_SHA256));
138 if (!j->checksum) {
139 r = log_oom();
140 goto finish;
141 }
142
143 log_debug("SHA256 of %s is %s.", j->url, j->checksum);
144 }
145
146 if (j->disk_fd >= 0 && j->allow_sparse) {
147 /* Make sure the file size is right, in case the file was
148 * sparse and we just seeked for the last part */
149
150 if (ftruncate(j->disk_fd, j->written_uncompressed) < 0) {
151 r = log_error_errno(errno, "Failed to truncate file: %m");
152 goto finish;
153 }
154
155 if (j->etag)
156 (void) fsetxattr(j->disk_fd, "user.source_etag", j->etag, strlen(j->etag), 0);
157 if (j->url)
158 (void) fsetxattr(j->disk_fd, "user.source_url", j->url, strlen(j->url), 0);
159
160 if (j->mtime != 0) {
161 struct timespec ut[2];
162
163 timespec_store(&ut[0], j->mtime);
164 ut[1] = ut[0];
165 (void) futimens(j->disk_fd, ut);
166
167 (void) fd_setcrtime(j->disk_fd, j->mtime);
168 }
169 }
170
171 r = 0;
172
173 finish:
174 pull_job_finish(j, r);
175 }
176
177 static int pull_job_write_uncompressed(const void *p, size_t sz, void *userdata) {
178 PullJob *j = userdata;
179 ssize_t n;
180
181 assert(j);
182 assert(p);
183
184 if (sz <= 0)
185 return 0;
186
187 if (j->written_uncompressed + sz < j->written_uncompressed) {
188 log_error("File too large, overflow");
189 return -EOVERFLOW;
190 }
191
192 if (j->written_uncompressed + sz > j->uncompressed_max) {
193 log_error("File overly large, refusing");
194 return -EFBIG;
195 }
196
197 if (j->disk_fd >= 0) {
198
199 if (j->grow_machine_directory && j->written_since_last_grow >= GROW_INTERVAL_BYTES) {
200 j->written_since_last_grow = 0;
201 grow_machine_directory();
202 }
203
204 if (j->allow_sparse)
205 n = sparse_write(j->disk_fd, p, sz, 64);
206 else
207 n = write(j->disk_fd, p, sz);
208 if (n < 0)
209 return log_error_errno(errno, "Failed to write file: %m");
210 if ((size_t) n < sz) {
211 log_error("Short write");
212 return -EIO;
213 }
214 } else {
215
216 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz))
217 return log_oom();
218
219 memcpy(j->payload + j->payload_size, p, sz);
220 j->payload_size += sz;
221 }
222
223 j->written_uncompressed += sz;
224 j->written_since_last_grow += sz;
225
226 return 0;
227 }
228
229 static int pull_job_write_compressed(PullJob *j, void *p, size_t sz) {
230 int r;
231
232 assert(j);
233 assert(p);
234
235 if (sz <= 0)
236 return 0;
237
238 if (j->written_compressed + sz < j->written_compressed) {
239 log_error("File too large, overflow");
240 return -EOVERFLOW;
241 }
242
243 if (j->written_compressed + sz > j->compressed_max) {
244 log_error("File overly large, refusing.");
245 return -EFBIG;
246 }
247
248 if (j->content_length != (uint64_t) -1 &&
249 j->written_compressed + sz > j->content_length) {
250 log_error("Content length incorrect.");
251 return -EFBIG;
252 }
253
254 if (j->checksum_context)
255 gcry_md_write(j->checksum_context, p, sz);
256
257 r = import_uncompress(&j->compress, p, sz, pull_job_write_uncompressed, j);
258 if (r < 0)
259 return r;
260
261 j->written_compressed += sz;
262
263 return 0;
264 }
265
266 static int pull_job_open_disk(PullJob *j) {
267 int r;
268
269 assert(j);
270
271 if (j->on_open_disk) {
272 r = j->on_open_disk(j);
273 if (r < 0)
274 return r;
275 }
276
277 if (j->disk_fd >= 0) {
278 /* Check if we can do sparse files */
279
280 if (lseek(j->disk_fd, SEEK_SET, 0) == 0)
281 j->allow_sparse = true;
282 else {
283 if (errno != ESPIPE)
284 return log_error_errno(errno, "Failed to seek on file descriptor: %m");
285
286 j->allow_sparse = false;
287 }
288 }
289
290 if (j->calc_checksum) {
291 if (gcry_md_open(&j->checksum_context, GCRY_MD_SHA256, 0) != 0) {
292 log_error("Failed to initialize hash context.");
293 return -EIO;
294 }
295 }
296
297 return 0;
298 }
299
300 static int pull_job_detect_compression(PullJob *j) {
301 _cleanup_free_ uint8_t *stub = NULL;
302 size_t stub_size;
303
304 int r;
305
306 assert(j);
307
308 r = import_uncompress_detect(&j->compress, j->payload, j->payload_size);
309 if (r < 0)
310 return log_error_errno(r, "Failed to initialize compressor: %m");
311 if (r == 0)
312 return 0;
313
314 log_debug("Stream is compressed: %s", import_compress_type_to_string(j->compress.type));
315
316 r = pull_job_open_disk(j);
317 if (r < 0)
318 return r;
319
320 /* Now, take the payload we read so far, and decompress it */
321 stub = j->payload;
322 stub_size = j->payload_size;
323
324 j->payload = NULL;
325 j->payload_size = 0;
326 j->payload_allocated = 0;
327
328 j->state = PULL_JOB_RUNNING;
329
330 r = pull_job_write_compressed(j, stub, stub_size);
331 if (r < 0)
332 return r;
333
334 return 0;
335 }
336
337 static size_t pull_job_write_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
338 PullJob *j = userdata;
339 size_t sz = size * nmemb;
340 int r;
341
342 assert(contents);
343 assert(j);
344
345 switch (j->state) {
346
347 case PULL_JOB_ANALYZING:
348 /* Let's first check what it actually is */
349
350 if (!GREEDY_REALLOC(j->payload, j->payload_allocated, j->payload_size + sz)) {
351 r = log_oom();
352 goto fail;
353 }
354
355 memcpy(j->payload + j->payload_size, contents, sz);
356 j->payload_size += sz;
357
358 r = pull_job_detect_compression(j);
359 if (r < 0)
360 goto fail;
361
362 break;
363
364 case PULL_JOB_RUNNING:
365
366 r = pull_job_write_compressed(j, contents, sz);
367 if (r < 0)
368 goto fail;
369
370 break;
371
372 case PULL_JOB_DONE:
373 case PULL_JOB_FAILED:
374 r = -ESTALE;
375 goto fail;
376
377 default:
378 assert_not_reached("Impossible state.");
379 }
380
381 return sz;
382
383 fail:
384 pull_job_finish(j, r);
385 return 0;
386 }
387
388 static size_t pull_job_header_callback(void *contents, size_t size, size_t nmemb, void *userdata) {
389 PullJob *j = userdata;
390 size_t sz = size * nmemb;
391 _cleanup_free_ char *length = NULL, *last_modified = NULL;
392 char *etag;
393 int r;
394
395 assert(contents);
396 assert(j);
397
398 if (j->state == PULL_JOB_DONE || j->state == PULL_JOB_FAILED) {
399 r = -ESTALE;
400 goto fail;
401 }
402
403 assert(j->state == PULL_JOB_ANALYZING);
404
405 r = curl_header_strdup(contents, sz, "ETag:", &etag);
406 if (r < 0) {
407 log_oom();
408 goto fail;
409 }
410 if (r > 0) {
411 free(j->etag);
412 j->etag = etag;
413
414 if (strv_contains(j->old_etags, j->etag)) {
415 log_info("Image already downloaded. Skipping download.");
416 j->etag_exists = true;
417 pull_job_finish(j, 0);
418 return sz;
419 }
420
421 return sz;
422 }
423
424 r = curl_header_strdup(contents, sz, "Content-Length:", &length);
425 if (r < 0) {
426 log_oom();
427 goto fail;
428 }
429 if (r > 0) {
430 (void) safe_atou64(length, &j->content_length);
431
432 if (j->content_length != (uint64_t) -1) {
433 char bytes[FORMAT_BYTES_MAX];
434
435 if (j->content_length > j->compressed_max) {
436 log_error("Content too large.");
437 r = -EFBIG;
438 goto fail;
439 }
440
441 log_info("Downloading %s for %s.", format_bytes(bytes, sizeof(bytes), j->content_length), j->url);
442 }
443
444 return sz;
445 }
446
447 r = curl_header_strdup(contents, sz, "Last-Modified:", &last_modified);
448 if (r < 0) {
449 log_oom();
450 goto fail;
451 }
452 if (r > 0) {
453 (void) curl_parse_http_time(last_modified, &j->mtime);
454 return sz;
455 }
456
457 if (j->on_header) {
458 r = j->on_header(j, contents, sz);
459 if (r < 0)
460 goto fail;
461 }
462
463 return sz;
464
465 fail:
466 pull_job_finish(j, r);
467 return 0;
468 }
469
470 static int pull_job_progress_callback(void *userdata, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
471 PullJob *j = userdata;
472 unsigned percent;
473 usec_t n;
474
475 assert(j);
476
477 if (dltotal <= 0)
478 return 0;
479
480 percent = ((100 * dlnow) / dltotal);
481 n = now(CLOCK_MONOTONIC);
482
483 if (n > j->last_status_usec + USEC_PER_SEC &&
484 percent != j->progress_percent &&
485 dlnow < dltotal) {
486 char buf[FORMAT_TIMESPAN_MAX];
487
488 if (n - j->start_usec > USEC_PER_SEC && dlnow > 0) {
489 char y[FORMAT_BYTES_MAX];
490 usec_t left, done;
491
492 done = n - j->start_usec;
493 left = (usec_t) (((double) done * (double) dltotal) / dlnow) - done;
494
495 log_info("Got %u%% of %s. %s left at %s/s.",
496 percent,
497 j->url,
498 format_timespan(buf, sizeof(buf), left, USEC_PER_SEC),
499 format_bytes(y, sizeof(y), (uint64_t) ((double) dlnow / ((double) done / (double) USEC_PER_SEC))));
500 } else
501 log_info("Got %u%% of %s.", percent, j->url);
502
503 j->progress_percent = percent;
504 j->last_status_usec = n;
505
506 if (j->on_progress)
507 j->on_progress(j);
508 }
509
510 return 0;
511 }
512
513 int pull_job_new(PullJob **ret, const char *url, CurlGlue *glue, void *userdata) {
514 _cleanup_(pull_job_unrefp) PullJob *j = NULL;
515
516 assert(url);
517 assert(glue);
518 assert(ret);
519
520 j = new0(PullJob, 1);
521 if (!j)
522 return -ENOMEM;
523
524 j->state = PULL_JOB_INIT;
525 j->disk_fd = -1;
526 j->userdata = userdata;
527 j->glue = glue;
528 j->content_length = (uint64_t) -1;
529 j->start_usec = now(CLOCK_MONOTONIC);
530 j->compressed_max = j->uncompressed_max = 8LLU * 1024LLU * 1024LLU * 1024LLU; /* 8GB */
531
532 j->url = strdup(url);
533 if (!j->url)
534 return -ENOMEM;
535
536 *ret = j;
537 j = NULL;
538
539 return 0;
540 }
541
542 int pull_job_begin(PullJob *j) {
543 int r;
544
545 assert(j);
546
547 if (j->state != PULL_JOB_INIT)
548 return -EBUSY;
549
550 if (j->grow_machine_directory)
551 grow_machine_directory();
552
553 r = curl_glue_make(&j->curl, j->url, j);
554 if (r < 0)
555 return r;
556
557 if (!strv_isempty(j->old_etags)) {
558 _cleanup_free_ char *cc = NULL, *hdr = NULL;
559
560 cc = strv_join(j->old_etags, ", ");
561 if (!cc)
562 return -ENOMEM;
563
564 hdr = strappend("If-None-Match: ", cc);
565 if (!hdr)
566 return -ENOMEM;
567
568 if (!j->request_header) {
569 j->request_header = curl_slist_new(hdr, NULL);
570 if (!j->request_header)
571 return -ENOMEM;
572 } else {
573 struct curl_slist *l;
574
575 l = curl_slist_append(j->request_header, hdr);
576 if (!l)
577 return -ENOMEM;
578
579 j->request_header = l;
580 }
581 }
582
583 if (j->request_header) {
584 if (curl_easy_setopt(j->curl, CURLOPT_HTTPHEADER, j->request_header) != CURLE_OK)
585 return -EIO;
586 }
587
588 if (curl_easy_setopt(j->curl, CURLOPT_WRITEFUNCTION, pull_job_write_callback) != CURLE_OK)
589 return -EIO;
590
591 if (curl_easy_setopt(j->curl, CURLOPT_WRITEDATA, j) != CURLE_OK)
592 return -EIO;
593
594 if (curl_easy_setopt(j->curl, CURLOPT_HEADERFUNCTION, pull_job_header_callback) != CURLE_OK)
595 return -EIO;
596
597 if (curl_easy_setopt(j->curl, CURLOPT_HEADERDATA, j) != CURLE_OK)
598 return -EIO;
599
600 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFOFUNCTION, pull_job_progress_callback) != CURLE_OK)
601 return -EIO;
602
603 if (curl_easy_setopt(j->curl, CURLOPT_XFERINFODATA, j) != CURLE_OK)
604 return -EIO;
605
606 if (curl_easy_setopt(j->curl, CURLOPT_NOPROGRESS, 0) != CURLE_OK)
607 return -EIO;
608
609 r = curl_glue_add(j->glue, j->curl);
610 if (r < 0)
611 return r;
612
613 j->state = PULL_JOB_ANALYZING;
614
615 return 0;
616 }