From: Michael Tremer Date: Thu, 11 Mar 2021 16:43:46 +0000 (+0000) Subject: downloader: Store transfers in a queue and only as as many as we want to run in parallel X-Git-Tag: 0.9.28~1285^2~569 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=91438bdb1146c49796ad445dd41e9673ee36117e;p=pakfire.git downloader: Store transfers in a queue and only as as many as we want to run in parallel Signed-off-by: Michael Tremer --- diff --git a/src/libpakfire/downloader.c b/src/libpakfire/downloader.c index 2ed4b06b9..187ad8ee8 100644 --- a/src/libpakfire/downloader.c +++ b/src/libpakfire/downloader.c @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -44,12 +45,28 @@ struct pakfire_mirror { unsigned int priority; }; +struct pakfire_downloader_transfer { + TAILQ_ENTRY(pakfire_downloader_transfer) nodes; + + CURL* handle; + + // Where do we write the result to? + char path[PATH_MAX]; + + // Temporary file + char tempfile[PATH_MAX]; + FILE* f; +}; + struct pakfire_downloader { Pakfire pakfire; int nrefs; + unsigned int parallel; + // cURL multi handle CURLM* curl; + TAILQ_HEAD(transfers, pakfire_downloader_transfer) transfers; // Mirror stuff char baseurl[PATH_MAX]; @@ -57,17 +74,6 @@ struct pakfire_downloader { unsigned int num_mirrors; }; -struct pakfire_downloader_transfer { - CURL* handle; - - // Where do we write the result to? - char path[PATH_MAX]; - - // Temporary file - char tempfile[PATH_MAX]; - FILE* f; -}; - static int pakfire_downloader_setup_curl(struct pakfire_downloader* downloader) { // Globally initialise cURL if (!curl_initialized++) { @@ -91,6 +97,19 @@ static int pakfire_downloader_setup_curl(struct pakfire_downloader* downloader) return 0; } +static void pakfire_downloader_transfer_free(struct pakfire_downloader_transfer* transfer) { + if (transfer->handle) + curl_easy_cleanup(transfer->handle); + + // Close temporary file + if (transfer->f) { + unlink(transfer->tempfile); + fclose(transfer->f); + } + + free(transfer); +} + static void pakfire_downloader_free(struct pakfire_downloader* downloader) { if (downloader->curl) curl_multi_cleanup(downloader->curl); @@ -99,6 +118,11 @@ static void pakfire_downloader_free(struct pakfire_downloader* downloader) { if (!--curl_initialized) curl_global_cleanup(); + // Free any unprocessed transfers + struct pakfire_downloader_transfer* transfer; + TAILQ_FOREACH(transfer, &downloader->transfers, nodes) + pakfire_downloader_transfer_free(transfer); + // Free mirrors if (downloader->mirrors) { for (unsigned int i = 0; i < downloader->num_mirrors; i++) @@ -121,6 +145,12 @@ int pakfire_downloader_create(struct pakfire_downloader** downloader, Pakfire pa // Initialize reference counting d->nrefs = 1; + // Set parallelism + d->parallel = MAX_PARALLEL; + + // Init transfers queue + TAILQ_INIT(&d->transfers); + // Setup cURL int r = pakfire_downloader_setup_curl(d); if (r) @@ -233,19 +263,6 @@ static int debug_callback(CURL *handle, curl_infotype type, } #endif -static void pakfire_downloader_transfer_free(struct pakfire_downloader_transfer* transfer) { - if (transfer->handle) - curl_easy_cleanup(transfer->handle); - - // Close temporary file - if (transfer->f) { - unlink(transfer->tempfile); - fclose(transfer->f); - } - - free(transfer); -} - static struct pakfire_downloader_transfer* pakfire_downloader_create_transfer( struct pakfire_downloader* downloader, const char* path) { struct pakfire_downloader_transfer* transfer = calloc(1, sizeof(*transfer)); @@ -325,6 +342,11 @@ int pakfire_downloader_add(struct pakfire_downloader* downloader, // Set URL curl_easy_setopt(transfer->handle, CURLOPT_URL, url); + // Push this transfer onto the queue + TAILQ_INSERT_HEAD(&downloader->transfers, transfer, nodes); + + return 0; + // Finally, add the handle to the queue int r = curl_multi_add_handle(downloader->curl, transfer->handle); if (r) { @@ -336,17 +358,36 @@ int pakfire_downloader_add(struct pakfire_downloader* downloader, } int pakfire_downloader_run(struct pakfire_downloader* downloader) { + struct pakfire_downloader_transfer* transfer; + unsigned int transfers = 0; + + int r; int still_running; int msgs_left = -1; do { + // Make sure that we have up to parallel transfers active + while (!TAILQ_EMPTY(&downloader->transfers) && transfers < downloader->parallel) { + // Fetch first item added + transfer = TAILQ_LAST(&downloader->transfers, transfers); + TAILQ_REMOVE(&downloader->transfers, transfer, nodes); + + r = curl_multi_add_handle(downloader->curl, transfer->handle); + if (r) { + ERROR(downloader->pakfire, "Adding handler failed\n"); + pakfire_downloader_transfer_free(transfer); + } + + transfers++; + } + curl_multi_perform(downloader->curl, &still_running); CURLMsg* msg; while ((msg = curl_multi_info_read(downloader->curl, &msgs_left))) { - struct pakfire_downloader_transfer* transfer; - if (msg->msg == CURLMSG_DONE) { + transfers--; + // Update reference to transfer curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &transfer); @@ -366,7 +407,7 @@ int pakfire_downloader_run(struct pakfire_downloader* downloader) { // Wait a little before going through the loop again if (still_running) curl_multi_wait(downloader->curl, NULL, 0, 100, NULL); - } while (still_running); + } while (still_running || !TAILQ_EMPTY(&downloader->transfers)); // Success return 0;