]> git.ipfire.org Git - pakfire.git/commitdiff
httpclient: Refactor the queue management
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 1 Feb 2025 13:40:03 +0000 (13:40 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 1 Feb 2025 13:41:25 +0000 (13:41 +0000)
It does not seem like a very good idea to give cURL all the handles at
the same time and let it sort it all out. Therefore, this is yet another
attempt to implement some half-decent queueing to implement parallel
downloads.

Fixes: #13812 - httpclient: Bring back a queue for transfers
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/httpclient.c
src/pakfire/httpclient.h
src/pakfire/xfer.c

index fc1cb27a8569e6a5885dc2efb0e2d7be565313b3..9e66ad72fdb78445f568919232733fddb305593c 100644 (file)
@@ -31,6 +31,7 @@
 #include <pakfire/progress.h>
 #include <pakfire/string.h>
 #include <pakfire/xfer.h>
+#include <pakfire/util.h>
 
 // The number of concurrent downloads
 #define DEFAULT_MAX_PARALLEL                   4
@@ -39,6 +40,13 @@ struct pakfire_httpclient_xfer {
        TAILQ_ENTRY(pakfire_httpclient_xfer) nodes;
 
        struct pakfire_xfer* xfer;
+
+       // Store the status
+       enum pakfire_httpclient_xfer_status {
+               PAKFIRE_XFER_QUEUED,
+               PAKFIRE_XFER_RUNNING,
+               PAKFIRE_XFER_FINISHED,
+       } status;
 };
 
 struct pakfire_httpclient {
@@ -64,7 +72,7 @@ struct pakfire_httpclient {
        struct pakfire_progress* progress;
 
        // How many transfers in parallel?
-       long max_parallel;
+       long unsigned max_parallel;
 
        // cURL multi handle
        CURLM* curl;
@@ -95,6 +103,9 @@ static int pakfire_httpclient_xfer_create(
        // Store a reference to the xfer
        e->xfer = pakfire_xfer_ref(xfer);
 
+       // Start as queued
+       e->status = PAKFIRE_XFER_QUEUED;
+
        // Return the pointer
        *x = e;
 
@@ -107,6 +118,41 @@ static void pakfire_httpclient_xfer_free(struct pakfire_httpclient_xfer* x) {
        free(x);
 }
 
+static size_t pakfire_httpclient_count_xfers(
+               struct pakfire_httpclient* self, const enum pakfire_httpclient_xfer_status status) {
+       struct pakfire_httpclient_xfer* e = NULL;
+       size_t counter = 0;
+
+       TAILQ_FOREACH(e, &self->xfers, nodes) {
+               if (e->status == status)
+                       counter++;
+       }
+
+       return counter;
+}
+
+static size_t pakfire_httpclient_num_running_xfers(struct pakfire_httpclient* self) {
+       return pakfire_httpclient_count_xfers(self, PAKFIRE_XFER_RUNNING);
+}
+
+static size_t pakfire_httpclient_num_queued_xfers(struct pakfire_httpclient* self) {
+       return pakfire_httpclient_count_xfers(self, PAKFIRE_XFER_QUEUED);
+}
+
+/*
+       Returns the next queued xfer or NULL
+*/
+static struct pakfire_httpclient_xfer* pakfire_httpclient_get_queued_xfer(struct pakfire_httpclient* self) {
+       struct pakfire_httpclient_xfer* e = NULL;
+
+       TAILQ_FOREACH(e, &self->xfers, nodes) {
+               if (e->status == PAKFIRE_XFER_QUEUED)
+                       return e;
+       }
+
+       return NULL;
+}
+
 static struct pakfire_httpclient_xfer* pakfire_httpclient_xfer_find(
                struct pakfire_httpclient* self, struct pakfire_xfer* xfer) {
        struct pakfire_httpclient_xfer* e = NULL;
@@ -119,7 +165,77 @@ static struct pakfire_httpclient_xfer* pakfire_httpclient_xfer_find(
        return NULL;
 }
 
+
+static int pakfire_httpclient_remove(
+               struct pakfire_httpclient* self, struct pakfire_httpclient_xfer* e) {
+       int r;
+
+       // Nothing to do if not running
+       switch (e->status) {
+               case PAKFIRE_XFER_RUNNING:
+                       break;
+
+               case PAKFIRE_XFER_QUEUED:
+               case PAKFIRE_XFER_FINISHED:
+                       return 0;
+       }
+
+       // Remove the handle
+       r = curl_multi_remove_handle(self->curl, pakfire_xfer_handle(e->xfer));
+       if (r) {
+               ERROR(self->ctx, "Could not remove the handle: %s\n", curl_multi_strerror(r));
+               return -ENOTSUP;
+       }
+
+       // Mark as finished
+       e->status = PAKFIRE_XFER_FINISHED;
+
+       return 0;
+}
+
+static int pakfire_httpclient_launch_one(
+               struct pakfire_httpclient* self, struct pakfire_httpclient_xfer* e) {
+       int r;
+
+       // Prepare the xfer
+       r = pakfire_xfer_prepare(e->xfer, self->progress, 0);
+       if (r < 0)
+               return r;
+
+       // Add the handle to cURL
+       r = curl_multi_add_handle(self->curl, pakfire_xfer_handle(e->xfer));
+       if (r) {
+               ERROR(self->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r));
+               return -ENOTSUP;
+       }
+
+       // Mark as running
+       e->status = PAKFIRE_XFER_RUNNING;
+
+       return 0;
+}
+
+static int pakfire_httpclient_launch(struct pakfire_httpclient* self) {
+       struct pakfire_httpclient_xfer* e = NULL;
+       int r;
+
+       while (pakfire_httpclient_num_running_xfers(self) < self->max_parallel) {
+               // Fetch the next queued xfer
+               e = pakfire_httpclient_get_queued_xfer(self);
+               if (!e)
+                       break;
+
+               // Launch it
+               r = pakfire_httpclient_launch_one(self, e);
+               if (r < 0)
+                       return r;
+       }
+
+       return 0;
+}
+
 static int pakfire_httpclient_check(struct pakfire_httpclient* self) {
+       struct pakfire_httpclient_xfer* e = NULL;
        struct pakfire_xfer* xfer = NULL;
        int r;
 
@@ -137,25 +253,40 @@ static int pakfire_httpclient_check(struct pakfire_httpclient* self) {
                                // Update reference to transfer
                                curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer);
 
-                               // Call the done callback
-                               r = pakfire_xfer_done(xfer, msg->data.result);
-                               switch (-r) {
-                                       // If we are asked to try again we will re-queue the transfer
-                                       case EAGAIN:
-                                               r = pakfire_httpclient_enqueue_xfer(self, xfer);
-                                               if (r)
-                                                       return r;
+                               // Find the queue element
+                               e = pakfire_httpclient_xfer_find(self, xfer);
+                               if (!e) {
+                                       ERROR(self->ctx, "Could not find queued transfer\n");
+                                       return -EINVAL;
+                               }
 
-                                               break;
+                               // Remove the handle
+                               r = pakfire_httpclient_remove(self, e);
+                               if (r < 0)
+                                       return r;
 
-                                       // Otherwise this transfer has finished
-                                       default:
-                                               if (r)
+                               // Call the done callback
+                               r = pakfire_xfer_done(xfer, msg->data.result);
+                               if (r < 0) {
+                                       switch (-r) {
+                                               case EAGAIN:
+                                                       // Launch it again
+                                                       r = pakfire_httpclient_launch_one(self, e);
+                                                       if (r < 0)
+                                                               return r;
+
+                                                       // Move to the next message
+                                                       continue;
+
+                                               // Raise any errors
+                                               default:
                                                        return r;
-
-                                               break;
+                                       }
                                }
 
+                               // Remove the xfer from the queue
+                               pakfire_httpclient_dequeue_xfer(self, xfer);
+
                                // Reset transfer
                                xfer = NULL;
                                break;
@@ -166,6 +297,11 @@ static int pakfire_httpclient_check(struct pakfire_httpclient* self) {
                }
        }
 
+       // Launch some more transfers
+       r = pakfire_httpclient_launch(self);
+       if (r < 0)
+               return r;
+
        // If we control the loop, terminate it if there are no more transfers left
        if (pakfire_httpclient_has_flag(self, PAKFIRE_HTTPCLIENT_OWN_LOOP)) {
                if (self->still_running <= 0)
@@ -557,48 +693,30 @@ int pakfire_httpclient_enqueue_xfer(struct pakfire_httpclient* self,
        if (r < 0)
                return r;
 
-       // Prepare the xfer
-       r = pakfire_xfer_prepare(xfer, self->progress, 0);
-       if (r < 0)
-               goto ERROR;
-
-       // Increment the xfer counter
-       self->total_xfers++;
-
        // Update the total download size
        self->total_downloadsize += pakfire_xfer_get_size(xfer);
 
-       // Add the handle to cURL
-       r = curl_multi_add_handle(self->curl, pakfire_xfer_handle(xfer));
-       if (r) {
-               ERROR(self->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r));
-               goto ERROR;
-       }
-
        // Keep a reference to the xfer
        TAILQ_INSERT_TAIL(&self->xfers, e, nodes);
 
        // Transfer enqueued
        return pakfire_xfer_enqueued(xfer, self);
-
-ERROR:
-       if (e)
-               pakfire_httpclient_xfer_free(e);
-
-       return r;
 }
 
 int pakfire_httpclient_dequeue_xfer(struct pakfire_httpclient* self,
                struct pakfire_xfer* xfer) {
        struct pakfire_httpclient_xfer* e = NULL;
+       int r;
 
        // Find reference
        e = pakfire_httpclient_xfer_find(self, xfer);
        if (!e)
                return 0;
 
-       // Decrement the xfers counter
-       self->total_xfers--;
+       // Remove the transfer
+       r = pakfire_httpclient_remove(self, e);
+       if (r < 0)
+               return r;
 
        // Reduce the total download size
        self->total_downloadsize -= pakfire_xfer_get_size(xfer);
@@ -609,25 +727,11 @@ int pakfire_httpclient_dequeue_xfer(struct pakfire_httpclient* self,
        return 0;
 }
 
-int pakfire_httpclient_remove_xfer(struct pakfire_httpclient* self,
-               struct pakfire_xfer* xfer) {
-       int r;
-
-       // Remove the handle
-       r = curl_multi_remove_handle(self->curl, pakfire_xfer_handle(xfer));
-       if (r) {
-               ERROR(self->ctx, "Could not remove the handle: %s\n", curl_multi_strerror(r));
-               return -ENOTSUP;
-       }
-
-       return 0;
-}
-
 int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title) {
        int r = 0;
 
        // Cannot run without any transfers
-       if (!self->total_xfers) {
+       if (TAILQ_EMPTY(&self->xfers)) {
                DEBUG(self->ctx, "Skipping running HTTP client without any transfers\n");
                return 0;
        }
@@ -642,6 +746,11 @@ int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title) {
        if (r)
                goto ERROR;
 
+       // Launch some transfers
+       r = pakfire_httpclient_launch(self);
+       if (r < 0)
+               goto ERROR;
+
        // Run the event loop
        r = sd_event_loop(self->loop);
        if (r < 0) {
@@ -649,6 +758,18 @@ int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title) {
                goto ERROR;
        }
 
+       if (pakfire_httpclient_num_running_xfers(self)) {
+               ERROR(self->ctx, "HTTP client ended with running transfers\n");
+               r = -ECANCELED;
+               goto ERROR;
+       }
+
+       if (pakfire_httpclient_num_queued_xfers(self)) {
+               ERROR(self->ctx, "Not all queued items have been downloaded\n");
+               r = -ECANCELED;
+               goto ERROR;
+       }
+
 ERROR:
        // We are finished!
        pakfire_progress_finish(self->progress);
index 962f86a44e9523e97539101494bd79a1e4e74fcf..730f7b4d7ce58ec8e127e95f8bdb29b160a85b37 100644 (file)
@@ -41,8 +41,6 @@ int pakfire_httpclient_enqueue_xfer(
        struct pakfire_httpclient* self, struct pakfire_xfer* xfer);
 int pakfire_httpclient_dequeue_xfer(
        struct pakfire_httpclient* self, struct pakfire_xfer* xfer);
-int pakfire_httpclient_remove_xfer(
-       struct pakfire_httpclient* self, struct pakfire_xfer* xfer);
 
 int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title);
 
index c6e9bae921a04ed245fe24726eba00666e1371dc..ac6375197f54e48438f67f30087ee693307e651a 100644 (file)
@@ -147,7 +147,7 @@ static void pakfire_xfer_free(struct pakfire_xfer* xfer) {
 
        // Remove the cURL handle
        if (xfer->client)
-               pakfire_httpclient_remove_xfer(xfer->client, xfer);
+               pakfire_httpclient_dequeue_xfer(xfer->client, xfer);
 
        // systemd
        if (xfer->event)
@@ -1323,7 +1323,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code)
        if (xfer->progress) {
                r = pakfire_progress_finish(xfer->progress);
                if (r < 0)
-                       goto ERROR;
+                       return r;
        }
 
        // Log the result
@@ -1356,7 +1356,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code)
        // Download Size
        r = curl_easy_getinfo(h, CURLINFO_SIZE_DOWNLOAD_T, &download_size);
        if (r)
-               goto ERROR;
+               return r;
 
        if (download_size)
                DEBUG(xfer->ctx, "  Download Size: %ld bytes\n", download_size);
@@ -1364,7 +1364,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code)
        // Download Speed
        r = curl_easy_getinfo(h, CURLINFO_SPEED_DOWNLOAD_T, &download_speed);
        if (r)
-               goto ERROR;
+               return r;
 
        if (download_speed)
                DEBUG(xfer->ctx, "  Download Speed: %ld bps\n", download_speed);
@@ -1372,7 +1372,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code)
        // Upload Size
        r = curl_easy_getinfo(h, CURLINFO_SIZE_UPLOAD_T, &upload_size);
        if (r)
-               goto ERROR;
+               return r;
 
        if (upload_size)
                DEBUG(xfer->ctx, "  Upload Size: %ld bytes\n", upload_size);
@@ -1380,7 +1380,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code)
        // Upload Speed
        r = curl_easy_getinfo(h, CURLINFO_SPEED_UPLOAD_T, &upload_speed);
        if (r)
-               goto ERROR;
+               return r;
 
        if (upload_speed)
                DEBUG(xfer->ctx, "  Upload Speed: %ld bps\n", upload_speed);
@@ -1392,19 +1392,19 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code)
                                // Verify the received payload
                                r = pakfire_xfer_verify(xfer);
                                if (r < 0)
-                                       goto ERROR;
+                                       return r;
 
                                // Save the payload
                                r = pakfire_xfer_save(xfer);
                                if (r < 0)
-                                       goto ERROR;
+                                       return r;
                                break;
 
 #ifdef CURL_HAS_WEBSOCKETS
                        case PAKFIRE_XFER_SOCKET:
                                r = pakfire_xfer_done_socket(xfer, code);
                                if (r < 0)
-                                       goto ERROR;
+                                       return r;
 #endif /* CURL_HAS_WEBSOCKETS */
 
                        default:
@@ -1419,15 +1419,10 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code)
                // Report that something went wrong
                r = pakfire_xfer_fail(xfer, code);
                if (r < 0)
-                       goto ERROR;
+                       return r;
        }
 
-ERROR:
-       // Remove the xfer from the queue
-       if (xfer->client)
-               pakfire_httpclient_dequeue_xfer(xfer->client, xfer);
-
-       return r;
+       return 0;
 }
 
 static int pakfire_xfer_update(void* data,