From da24f937032f17f72c6d3c1606225e8cb29617da Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Sat, 1 Feb 2025 13:40:03 +0000 Subject: [PATCH] httpclient: Refactor the queue management It does not seem like a very good idea to give cURL all the handles at the same time and let it sort it all out. Therefore, this is yet another attempt to implement some half-decent queueing to implement parallel downloads. Fixes: #13812 - httpclient: Bring back a queue for transfers Signed-off-by: Michael Tremer --- src/pakfire/httpclient.c | 227 ++++++++++++++++++++++++++++++--------- src/pakfire/httpclient.h | 2 - src/pakfire/xfer.c | 27 ++--- 3 files changed, 185 insertions(+), 71 deletions(-) diff --git a/src/pakfire/httpclient.c b/src/pakfire/httpclient.c index fc1cb27a..9e66ad72 100644 --- a/src/pakfire/httpclient.c +++ b/src/pakfire/httpclient.c @@ -31,6 +31,7 @@ #include #include #include +#include // The number of concurrent downloads #define DEFAULT_MAX_PARALLEL 4 @@ -39,6 +40,13 @@ struct pakfire_httpclient_xfer { TAILQ_ENTRY(pakfire_httpclient_xfer) nodes; struct pakfire_xfer* xfer; + + // Store the status + enum pakfire_httpclient_xfer_status { + PAKFIRE_XFER_QUEUED, + PAKFIRE_XFER_RUNNING, + PAKFIRE_XFER_FINISHED, + } status; }; struct pakfire_httpclient { @@ -64,7 +72,7 @@ struct pakfire_httpclient { struct pakfire_progress* progress; // How many transfers in parallel? - long max_parallel; + long unsigned max_parallel; // cURL multi handle CURLM* curl; @@ -95,6 +103,9 @@ static int pakfire_httpclient_xfer_create( // Store a reference to the xfer e->xfer = pakfire_xfer_ref(xfer); + // Start as queued + e->status = PAKFIRE_XFER_QUEUED; + // Return the pointer *x = e; @@ -107,6 +118,41 @@ static void pakfire_httpclient_xfer_free(struct pakfire_httpclient_xfer* x) { free(x); } +static size_t pakfire_httpclient_count_xfers( + struct pakfire_httpclient* self, const enum pakfire_httpclient_xfer_status status) { + struct pakfire_httpclient_xfer* e = NULL; + size_t counter = 0; + + TAILQ_FOREACH(e, &self->xfers, nodes) { + if (e->status == status) + counter++; + } + + return counter; +} + +static size_t pakfire_httpclient_num_running_xfers(struct pakfire_httpclient* self) { + return pakfire_httpclient_count_xfers(self, PAKFIRE_XFER_RUNNING); +} + +static size_t pakfire_httpclient_num_queued_xfers(struct pakfire_httpclient* self) { + return pakfire_httpclient_count_xfers(self, PAKFIRE_XFER_QUEUED); +} + +/* + Returns the next queued xfer or NULL +*/ +static struct pakfire_httpclient_xfer* pakfire_httpclient_get_queued_xfer(struct pakfire_httpclient* self) { + struct pakfire_httpclient_xfer* e = NULL; + + TAILQ_FOREACH(e, &self->xfers, nodes) { + if (e->status == PAKFIRE_XFER_QUEUED) + return e; + } + + return NULL; +} + static struct pakfire_httpclient_xfer* pakfire_httpclient_xfer_find( struct pakfire_httpclient* self, struct pakfire_xfer* xfer) { struct pakfire_httpclient_xfer* e = NULL; @@ -119,7 +165,77 @@ static struct pakfire_httpclient_xfer* pakfire_httpclient_xfer_find( return NULL; } + +static int pakfire_httpclient_remove( + struct pakfire_httpclient* self, struct pakfire_httpclient_xfer* e) { + int r; + + // Nothing to do if not running + switch (e->status) { + case PAKFIRE_XFER_RUNNING: + break; + + case PAKFIRE_XFER_QUEUED: + case PAKFIRE_XFER_FINISHED: + return 0; + } + + // Remove the handle + r = curl_multi_remove_handle(self->curl, pakfire_xfer_handle(e->xfer)); + if (r) { + ERROR(self->ctx, "Could not remove the handle: %s\n", curl_multi_strerror(r)); + return -ENOTSUP; + } + + // Mark as finished + e->status = PAKFIRE_XFER_FINISHED; + + return 0; +} + +static int pakfire_httpclient_launch_one( + struct pakfire_httpclient* self, struct pakfire_httpclient_xfer* e) { + int r; + + // Prepare the xfer + r = pakfire_xfer_prepare(e->xfer, self->progress, 0); + if (r < 0) + return r; + + // Add the handle to cURL + r = curl_multi_add_handle(self->curl, pakfire_xfer_handle(e->xfer)); + if (r) { + ERROR(self->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r)); + return -ENOTSUP; + } + + // Mark as running + e->status = PAKFIRE_XFER_RUNNING; + + return 0; +} + +static int pakfire_httpclient_launch(struct pakfire_httpclient* self) { + struct pakfire_httpclient_xfer* e = NULL; + int r; + + while (pakfire_httpclient_num_running_xfers(self) < self->max_parallel) { + // Fetch the next queued xfer + e = pakfire_httpclient_get_queued_xfer(self); + if (!e) + break; + + // Launch it + r = pakfire_httpclient_launch_one(self, e); + if (r < 0) + return r; + } + + return 0; +} + static int pakfire_httpclient_check(struct pakfire_httpclient* self) { + struct pakfire_httpclient_xfer* e = NULL; struct pakfire_xfer* xfer = NULL; int r; @@ -137,25 +253,40 @@ static int pakfire_httpclient_check(struct pakfire_httpclient* self) { // Update reference to transfer curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer); - // Call the done callback - r = pakfire_xfer_done(xfer, msg->data.result); - switch (-r) { - // If we are asked to try again we will re-queue the transfer - case EAGAIN: - r = pakfire_httpclient_enqueue_xfer(self, xfer); - if (r) - return r; + // Find the queue element + e = pakfire_httpclient_xfer_find(self, xfer); + if (!e) { + ERROR(self->ctx, "Could not find queued transfer\n"); + return -EINVAL; + } - break; + // Remove the handle + r = pakfire_httpclient_remove(self, e); + if (r < 0) + return r; - // Otherwise this transfer has finished - default: - if (r) + // Call the done callback + r = pakfire_xfer_done(xfer, msg->data.result); + if (r < 0) { + switch (-r) { + case EAGAIN: + // Launch it again + r = pakfire_httpclient_launch_one(self, e); + if (r < 0) + return r; + + // Move to the next message + continue; + + // Raise any errors + default: return r; - - break; + } } + // Remove the xfer from the queue + pakfire_httpclient_dequeue_xfer(self, xfer); + // Reset transfer xfer = NULL; break; @@ -166,6 +297,11 @@ static int pakfire_httpclient_check(struct pakfire_httpclient* self) { } } + // Launch some more transfers + r = pakfire_httpclient_launch(self); + if (r < 0) + return r; + // If we control the loop, terminate it if there are no more transfers left if (pakfire_httpclient_has_flag(self, PAKFIRE_HTTPCLIENT_OWN_LOOP)) { if (self->still_running <= 0) @@ -557,48 +693,30 @@ int pakfire_httpclient_enqueue_xfer(struct pakfire_httpclient* self, if (r < 0) return r; - // Prepare the xfer - r = pakfire_xfer_prepare(xfer, self->progress, 0); - if (r < 0) - goto ERROR; - - // Increment the xfer counter - self->total_xfers++; - // Update the total download size self->total_downloadsize += pakfire_xfer_get_size(xfer); - // Add the handle to cURL - r = curl_multi_add_handle(self->curl, pakfire_xfer_handle(xfer)); - if (r) { - ERROR(self->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r)); - goto ERROR; - } - // Keep a reference to the xfer TAILQ_INSERT_TAIL(&self->xfers, e, nodes); // Transfer enqueued return pakfire_xfer_enqueued(xfer, self); - -ERROR: - if (e) - pakfire_httpclient_xfer_free(e); - - return r; } int pakfire_httpclient_dequeue_xfer(struct pakfire_httpclient* self, struct pakfire_xfer* xfer) { struct pakfire_httpclient_xfer* e = NULL; + int r; // Find reference e = pakfire_httpclient_xfer_find(self, xfer); if (!e) return 0; - // Decrement the xfers counter - self->total_xfers--; + // Remove the transfer + r = pakfire_httpclient_remove(self, e); + if (r < 0) + return r; // Reduce the total download size self->total_downloadsize -= pakfire_xfer_get_size(xfer); @@ -609,25 +727,11 @@ int pakfire_httpclient_dequeue_xfer(struct pakfire_httpclient* self, return 0; } -int pakfire_httpclient_remove_xfer(struct pakfire_httpclient* self, - struct pakfire_xfer* xfer) { - int r; - - // Remove the handle - r = curl_multi_remove_handle(self->curl, pakfire_xfer_handle(xfer)); - if (r) { - ERROR(self->ctx, "Could not remove the handle: %s\n", curl_multi_strerror(r)); - return -ENOTSUP; - } - - return 0; -} - int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title) { int r = 0; // Cannot run without any transfers - if (!self->total_xfers) { + if (TAILQ_EMPTY(&self->xfers)) { DEBUG(self->ctx, "Skipping running HTTP client without any transfers\n"); return 0; } @@ -642,6 +746,11 @@ int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title) { if (r) goto ERROR; + // Launch some transfers + r = pakfire_httpclient_launch(self); + if (r < 0) + goto ERROR; + // Run the event loop r = sd_event_loop(self->loop); if (r < 0) { @@ -649,6 +758,18 @@ int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title) { goto ERROR; } + if (pakfire_httpclient_num_running_xfers(self)) { + ERROR(self->ctx, "HTTP client ended with running transfers\n"); + r = -ECANCELED; + goto ERROR; + } + + if (pakfire_httpclient_num_queued_xfers(self)) { + ERROR(self->ctx, "Not all queued items have been downloaded\n"); + r = -ECANCELED; + goto ERROR; + } + ERROR: // We are finished! pakfire_progress_finish(self->progress); diff --git a/src/pakfire/httpclient.h b/src/pakfire/httpclient.h index 962f86a4..730f7b4d 100644 --- a/src/pakfire/httpclient.h +++ b/src/pakfire/httpclient.h @@ -41,8 +41,6 @@ int pakfire_httpclient_enqueue_xfer( struct pakfire_httpclient* self, struct pakfire_xfer* xfer); int pakfire_httpclient_dequeue_xfer( struct pakfire_httpclient* self, struct pakfire_xfer* xfer); -int pakfire_httpclient_remove_xfer( - struct pakfire_httpclient* self, struct pakfire_xfer* xfer); int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title); diff --git a/src/pakfire/xfer.c b/src/pakfire/xfer.c index c6e9bae9..ac637519 100644 --- a/src/pakfire/xfer.c +++ b/src/pakfire/xfer.c @@ -147,7 +147,7 @@ static void pakfire_xfer_free(struct pakfire_xfer* xfer) { // Remove the cURL handle if (xfer->client) - pakfire_httpclient_remove_xfer(xfer->client, xfer); + pakfire_httpclient_dequeue_xfer(xfer->client, xfer); // systemd if (xfer->event) @@ -1323,7 +1323,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code) if (xfer->progress) { r = pakfire_progress_finish(xfer->progress); if (r < 0) - goto ERROR; + return r; } // Log the result @@ -1356,7 +1356,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code) // Download Size r = curl_easy_getinfo(h, CURLINFO_SIZE_DOWNLOAD_T, &download_size); if (r) - goto ERROR; + return r; if (download_size) DEBUG(xfer->ctx, " Download Size: %ld bytes\n", download_size); @@ -1364,7 +1364,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code) // Download Speed r = curl_easy_getinfo(h, CURLINFO_SPEED_DOWNLOAD_T, &download_speed); if (r) - goto ERROR; + return r; if (download_speed) DEBUG(xfer->ctx, " Download Speed: %ld bps\n", download_speed); @@ -1372,7 +1372,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code) // Upload Size r = curl_easy_getinfo(h, CURLINFO_SIZE_UPLOAD_T, &upload_size); if (r) - goto ERROR; + return r; if (upload_size) DEBUG(xfer->ctx, " Upload Size: %ld bytes\n", upload_size); @@ -1380,7 +1380,7 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code) // Upload Speed r = curl_easy_getinfo(h, CURLINFO_SPEED_UPLOAD_T, &upload_speed); if (r) - goto ERROR; + return r; if (upload_speed) DEBUG(xfer->ctx, " Upload Speed: %ld bps\n", upload_speed); @@ -1392,19 +1392,19 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code) // Verify the received payload r = pakfire_xfer_verify(xfer); if (r < 0) - goto ERROR; + return r; // Save the payload r = pakfire_xfer_save(xfer); if (r < 0) - goto ERROR; + return r; break; #ifdef CURL_HAS_WEBSOCKETS case PAKFIRE_XFER_SOCKET: r = pakfire_xfer_done_socket(xfer, code); if (r < 0) - goto ERROR; + return r; #endif /* CURL_HAS_WEBSOCKETS */ default: @@ -1419,15 +1419,10 @@ pakfire_xfer_error_code_t pakfire_xfer_done(struct pakfire_xfer* xfer, int code) // Report that something went wrong r = pakfire_xfer_fail(xfer, code); if (r < 0) - goto ERROR; + return r; } -ERROR: - // Remove the xfer from the queue - if (xfer->client) - pakfire_httpclient_dequeue_xfer(xfer->client, xfer); - - return r; + return 0; } static int pakfire_xfer_update(void* data, -- 2.39.5