]> git.ipfire.org Git - pakfire.git/commitdiff
downloader: Store transfers in a queue and only as as many as we want to run in parallel
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 11 Mar 2021 16:43:46 +0000 (16:43 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 11 Mar 2021 16:43:46 +0000 (16:43 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/libpakfire/downloader.c

index 2ed4b06b9b3e82508f56e9cca0d71dd1b5e1bcb9..187ad8ee8b629c7a0f8ac490fe2482f8da3a3416 100644 (file)
@@ -20,6 +20,7 @@
 
 #include <errno.h>
 #include <stdlib.h>
+#include <sys/queue.h>
 #include <unistd.h>
 
 #include <curl/curl.h>
@@ -44,12 +45,28 @@ struct pakfire_mirror {
        unsigned int priority;
 };
 
+struct pakfire_downloader_transfer {
+       TAILQ_ENTRY(pakfire_downloader_transfer) nodes;
+
+       CURL* handle;
+
+       // Where do we write the result to?
+       char path[PATH_MAX];
+
+       // Temporary file
+       char tempfile[PATH_MAX];
+       FILE* f;
+};
+
 struct pakfire_downloader {
        Pakfire pakfire;
        int nrefs;
 
+       unsigned int parallel;
+
        // cURL multi handle
        CURLM* curl;
+       TAILQ_HEAD(transfers, pakfire_downloader_transfer) transfers;
 
        // Mirror stuff
        char baseurl[PATH_MAX];
@@ -57,17 +74,6 @@ struct pakfire_downloader {
        unsigned int num_mirrors;
 };
 
-struct pakfire_downloader_transfer {
-       CURL* handle;
-
-       // Where do we write the result to?
-       char path[PATH_MAX];
-
-       // Temporary file
-       char tempfile[PATH_MAX];
-       FILE* f;
-};
-
 static int pakfire_downloader_setup_curl(struct pakfire_downloader* downloader) {
        // Globally initialise cURL
        if (!curl_initialized++) {
@@ -91,6 +97,19 @@ static int pakfire_downloader_setup_curl(struct pakfire_downloader* downloader)
        return 0;
 }
 
+static void pakfire_downloader_transfer_free(struct pakfire_downloader_transfer* transfer) {
+       if (transfer->handle)
+               curl_easy_cleanup(transfer->handle);
+
+       // Close temporary file
+       if (transfer->f) {
+               unlink(transfer->tempfile);
+               fclose(transfer->f);
+       }
+
+       free(transfer);
+}
+
 static void pakfire_downloader_free(struct pakfire_downloader* downloader) {
        if (downloader->curl)
                curl_multi_cleanup(downloader->curl);
@@ -99,6 +118,11 @@ static void pakfire_downloader_free(struct pakfire_downloader* downloader) {
        if (!--curl_initialized)
                curl_global_cleanup();
 
+       // Free any unprocessed transfers
+       struct pakfire_downloader_transfer* transfer;
+       TAILQ_FOREACH(transfer, &downloader->transfers, nodes)
+               pakfire_downloader_transfer_free(transfer);
+
        // Free mirrors
        if (downloader->mirrors) {
                for (unsigned int i = 0; i < downloader->num_mirrors; i++)
@@ -121,6 +145,12 @@ int pakfire_downloader_create(struct pakfire_downloader** downloader, Pakfire pa
        // Initialize reference counting
        d->nrefs = 1;
 
+       // Set parallelism
+       d->parallel = MAX_PARALLEL;
+
+       // Init transfers queue
+       TAILQ_INIT(&d->transfers);
+
        // Setup cURL
        int r = pakfire_downloader_setup_curl(d);
        if (r)
@@ -233,19 +263,6 @@ static int debug_callback(CURL *handle, curl_infotype type,
 }
 #endif
 
-static void pakfire_downloader_transfer_free(struct pakfire_downloader_transfer* transfer) {
-       if (transfer->handle)
-               curl_easy_cleanup(transfer->handle);
-
-       // Close temporary file
-       if (transfer->f) {
-               unlink(transfer->tempfile);
-               fclose(transfer->f);
-       }
-
-       free(transfer);
-}
-
 static struct pakfire_downloader_transfer* pakfire_downloader_create_transfer(
                struct pakfire_downloader* downloader, const char* path) {
        struct pakfire_downloader_transfer* transfer = calloc(1, sizeof(*transfer));
@@ -325,6 +342,11 @@ int pakfire_downloader_add(struct pakfire_downloader* downloader,
        // Set URL
        curl_easy_setopt(transfer->handle, CURLOPT_URL, url);
 
+       // Push this transfer onto the queue
+       TAILQ_INSERT_HEAD(&downloader->transfers, transfer, nodes);
+
+       return 0;
+
        // Finally, add the handle to the queue
        int r = curl_multi_add_handle(downloader->curl, transfer->handle);
        if (r) {
@@ -336,17 +358,36 @@ int pakfire_downloader_add(struct pakfire_downloader* downloader,
 }
 
 int pakfire_downloader_run(struct pakfire_downloader* downloader) {
+       struct pakfire_downloader_transfer* transfer;
+       unsigned int transfers = 0;
+
+       int r;
        int still_running;
        int msgs_left = -1;
 
        do {
+               // Make sure that we have up to parallel transfers active
+               while (!TAILQ_EMPTY(&downloader->transfers) && transfers < downloader->parallel) {
+                       // Fetch first item added
+                       transfer = TAILQ_LAST(&downloader->transfers, transfers);
+                       TAILQ_REMOVE(&downloader->transfers, transfer, nodes);
+
+                       r = curl_multi_add_handle(downloader->curl, transfer->handle);
+                       if (r) {
+                               ERROR(downloader->pakfire, "Adding handler failed\n");
+                               pakfire_downloader_transfer_free(transfer);
+                       }
+
+                       transfers++;
+               }
+
                curl_multi_perform(downloader->curl, &still_running);
 
                CURLMsg* msg;
                while ((msg = curl_multi_info_read(downloader->curl, &msgs_left))) {
-                       struct pakfire_downloader_transfer* transfer;
-
                        if (msg->msg == CURLMSG_DONE) {
+                               transfers--;
+
                                // Update reference to transfer
                                curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &transfer);
 
@@ -366,7 +407,7 @@ int pakfire_downloader_run(struct pakfire_downloader* downloader) {
                // Wait a little before going through the loop again
                if (still_running)
                        curl_multi_wait(downloader->curl, NULL, 0, 100, NULL);
-       } while (still_running);
+       } while (still_running || !TAILQ_EMPTY(&downloader->transfers));
 
        // Success
        return 0;