From: Michael Tremer Date: Sat, 10 Aug 2024 17:33:41 +0000 (+0000) Subject: httpclient: Implement parallel transfers with sd-events X-Git-Tag: 0.9.30~1218 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=43f334c2ba67562aea092fc576d8f2ce10ff6e52;p=pakfire.git httpclient: Implement parallel transfers with sd-events This patch will create a new event loop for each HTTP client which will be used to connect to cURL. Signed-off-by: Michael Tremer --- diff --git a/src/libpakfire/httpclient.c b/src/libpakfire/httpclient.c index 69d795361..4ec33229a 100644 --- a/src/libpakfire/httpclient.c +++ b/src/libpakfire/httpclient.c @@ -24,6 +24,8 @@ #include +#include + #include #include #include @@ -42,7 +44,14 @@ struct pakfire_httpclient { struct pakfire_ctx* ctx; int nrefs; - unsigned int parallel; + // Event Loop + sd_event* loop; + int still_running; + + // Timer + sd_event_source* timer; + + int parallel; // cURL share handle CURLSH* share; @@ -55,6 +64,314 @@ struct pakfire_httpclient { TAILQ_HEAD(xfers_running, pakfire_xfer_element) xfers_running; }; +static struct pakfire_xfer_element* pakfire_httpclient_xfer_create(struct pakfire_xfer* xfer) { + struct pakfire_xfer_element* x = NULL; + + // Allocate a new element + x = calloc(1, sizeof(*x)); + if (!x) + return NULL; + + // Store a reference to the xfer + x->xfer = pakfire_xfer_ref(xfer); + + return x; +} + +static void pakfire_httpclient_xfer_free(struct pakfire_xfer_element* x) { + if (x->xfer) + pakfire_xfer_unref(x->xfer); + + free(x); +} + +static int pakfire_httpclient_start_transfers( + struct pakfire_httpclient* client, struct pakfire_progress* progress) { + struct pakfire_xfer_element* x = NULL; + int r; + + // Keep running until we have reached our ceiling + while (client->still_running < client->parallel) { + // We are done if there are no more transfers in the queue + if (TAILQ_EMPTY(&client->xfers_queued)) + break; + + // Fetch the next transfer + x = TAILQ_LAST(&client->xfers_queued, xfers_queued); + TAILQ_REMOVE(&client->xfers_queued, x, nodes); + + // Prepare the xfer + r = pakfire_xfer_prepare(x->xfer, progress, 0); + if (r) + goto ERROR; + + // Add the handle to cURL + r = curl_multi_add_handle(client->curl, pakfire_xfer_handle(x->xfer)); + if (r) { + CTX_ERROR(client->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r)); + goto ERROR; + } + + TAILQ_INSERT_TAIL(&client->xfers_running, x, nodes); + } + + return 0; + +ERROR: + pakfire_httpclient_xfer_free(x); + + return r; +} + +static struct pakfire_xfer_element* pakfire_httpclient_xfer_find_running( + struct pakfire_httpclient* client, struct pakfire_xfer* xfer) { + struct pakfire_xfer_element* x = NULL; + + TAILQ_FOREACH(x, &client->xfers_running, nodes) { + if (x->xfer == xfer) + return x; + } + + return NULL; +} + +static int pakfire_httpclient_check(struct pakfire_httpclient* client) { + struct pakfire_xfer_element* x = NULL; + struct pakfire_xfer* xfer = NULL; + int r; + + CURLMsg* msg = NULL; + int msgs_left = 0; + + for (;;) { + // Read the next message + msg = curl_multi_info_read(client->curl, &msgs_left); + if (!msg) + break; + + switch (msg->msg) { + case CURLMSG_DONE: + // Update reference to transfer + curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer); + + // Remove the handle + curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer)); + + // Find the matching xfer element + x = pakfire_httpclient_xfer_find_running(client, xfer); + + // Remove the transfer from the running list + if (x) + TAILQ_REMOVE(&client->xfers_running, x, nodes); + + // Call the done callback + r = pakfire_xfer_done(xfer, msg->data.result); + switch (-r) { + // If we are asked to try again we will re-queue the transfer + case EAGAIN: + if (x) + TAILQ_INSERT_TAIL(&client->xfers_queued, x, nodes); + break; + + // Otherwise this transfer has finished + default: + if (x) + pakfire_httpclient_xfer_free(x); + if (r) + return r; + break; + } + + // Reset transfer + xfer = NULL; + break; + + default: + CTX_ERROR(client->ctx, "Received unhandled cURL message %u\n", msg->msg); + break; + } + } + + return 0; +} + +static int __pakfire_httpclient_timer(sd_event_source* s, uint64_t usec, void* data) { + struct pakfire_httpclient* client = data; + int r; + + CTX_DEBUG(client->ctx, "cURL timer fired\n"); + + r = curl_multi_socket_action(client->curl, CURL_SOCKET_TIMEOUT, 0, &client->still_running); + if (r) + return r; + + // Check for any messages + r = pakfire_httpclient_check(client); + if (r) + return r; + + return 0; +} + +static int pakfire_httpclient_timer(CURLM* multi, long timeout_ms, void* data) { + struct pakfire_httpclient* client = data; + int r; + + // A negative value indicates that we should remove the timer + if (timeout_ms < 0) { + if (client->timer) { + r = sd_event_source_set_enabled(client->timer, SD_EVENT_OFF); + if (r < 0) + CTX_ERROR(client->ctx, "Could not disarm the timer: %s\n", strerror(-r)); + + return 0; + } + } + + // Set the timer + r = sd_event_source_set_time_relative(client->timer, timeout_ms * 1000); + if (r < 0) { + CTX_ERROR(client->ctx, "Could not set timer: %s\n", strerror(-r)); + + return r; + } + + // Have the timer fire once + r = sd_event_source_set_enabled(client->timer, SD_EVENT_ONESHOT); + if (r < 0) { + CTX_ERROR(client->ctx, "Could not enable the timer: %s\n", strerror(-r)); + + return r; + } + + CTX_DEBUG(client->ctx, "cURL set a timer for %ldms\n", timeout_ms); + + return 0; +} + +static int __pakfire_httpclient_socket(sd_event_source* s, int fd, uint32_t events, void* data) { + struct pakfire_httpclient* client = data; + int r; + + int action = 0; + + if (events & EPOLLIN) + action |= CURL_CSELECT_IN; + + if (events & EPOLLOUT) + action |= CURL_CSELECT_OUT; + + //CTX_DEBUG(client->ctx, "cURL has activity on socket %d\n", fd); + + // Inform cURL about some socket activity + r = curl_multi_socket_action(client->curl, fd, action, &client->still_running); + if (r) + return r; + + // Check for any messages + r = pakfire_httpclient_check(client); + if (r) + return r; + + // Disarm the timer + if (client->still_running <= 0) { + if (client->timer) { + r = sd_event_source_set_enabled(client->timer, SD_EVENT_OFF); + if (r < 0) { + CTX_ERROR(client->ctx, "Could not disarm the timer: %s\n", strerror(-r)); + + return r; + } + } + } + + return 0; +} + +static int pakfire_httpclient_socket(CURL* e, curl_socket_t fd, int what, void* data, void* data2) { + struct pakfire_httpclient* client = data; + sd_event_source* s = data2; + uint32_t events = 0; + int r; + + // Remove the socket? + if (what == CURL_POLL_REMOVE) { + // Disable the event + r = sd_event_source_set_enabled(s, SD_EVENT_OFF); + if (r < 0) + CTX_ERROR(client->ctx, "Could not disable fd %d: %s\n", fd, strerror(-r)); + + CTX_DEBUG(client->ctx, "cURL deregistered socket %d\n", fd); + + return r; + } + + // Do we need to read from this socket? + if (what & CURL_POLL_IN) + events |= EPOLLIN; + + // Do we need to write to this socket? + if (what & CURL_POLL_OUT) + events |= EPOLLOUT; + + // Change events? + if (s) { + r = sd_event_source_set_io_events(s, events); + if (r < 0) { + CTX_ERROR(client->ctx, "Could not change events for socket %d: %s\n", + fd, strerror(-r)); + + return r; + } + + CTX_DEBUG(client->ctx, "cURL changed socket %d\n", fd); + + return 0; + } + + // Add the socket to the event loop + r = sd_event_add_io(client->loop, &s, fd, events, __pakfire_httpclient_socket, client); + if (r < 0) { + CTX_ERROR(client->ctx, "Could not register socket %d: %s\n", fd, strerror(-r)); + + goto ERROR; + } + + // Store the event source + curl_multi_assign(client->curl, fd, sd_event_source_ref(s)); + + CTX_DEBUG(client->ctx, "cURL registered socket %d\n", fd); + +ERROR: + if (s) + sd_event_source_unref(s); + + return r; +} + +static int pakfire_httpclient_setup_loop(struct pakfire_httpclient* client) { + int r; + + // Create a new event loop + r = sd_event_new(&client->loop); + if (r < 0) { + CTX_ERROR(client->ctx, "Could not setup event loop: %s\n", strerror(-r)); + + return r; + } + + // Create a new timer + r = sd_event_add_time_relative(client->loop, &client->timer, CLOCK_MONOTONIC, 0, 0, + __pakfire_httpclient_timer, client); + if (r < 0) { + CTX_ERROR(client->ctx, "Could not set timer: %s\n", strerror(-r)); + + return r; + } + + return 0; +} + static int pakfire_httpclient_setup_curl(struct pakfire_httpclient* client) { int r; @@ -87,40 +404,40 @@ static int pakfire_httpclient_setup_curl(struct pakfire_httpclient* client) { return r; } - return 0; -} - -static struct pakfire_xfer_element* pakfire_httpclient_xfer_create(struct pakfire_xfer* xfer) { - struct pakfire_xfer_element* x = NULL; + // Register with the event loop + r = curl_multi_setopt(client->curl, CURLMOPT_TIMERFUNCTION, pakfire_httpclient_timer); + if (r) { + CTX_ERROR(client->ctx, "Could not register the timer function: %s\n", + curl_multi_strerror(r)); - // Allocate a new element - x = calloc(1, sizeof(*x)); - if (!x) - return NULL; + return r; + } - // Store a reference to the xfer - x->xfer = pakfire_xfer_ref(xfer); + r = curl_multi_setopt(client->curl, CURLMOPT_TIMERDATA, client); + if (r) { + CTX_ERROR(client->ctx, "Could not register the timer data: %s\n", + curl_multi_strerror(r)); - return x; -} + return r; + } -static struct pakfire_xfer_element* pakfire_httpclient_xfer_find_running( - struct pakfire_httpclient* client, struct pakfire_xfer* xfer) { - struct pakfire_xfer_element* x = NULL; + r = curl_multi_setopt(client->curl, CURLMOPT_SOCKETFUNCTION, pakfire_httpclient_socket); + if (r) { + CTX_ERROR(client->ctx, "Could not register the socket function: %s\n", + curl_multi_strerror(r)); - TAILQ_FOREACH(x, &client->xfers_running, nodes) { - if (x->xfer == xfer) - return x; + return r; } - return NULL; -} + r = curl_multi_setopt(client->curl, CURLMOPT_SOCKETDATA, client); + if (r) { + CTX_ERROR(client->ctx, "Could not register the socket data: %s\n", + curl_multi_strerror(r)); -static void pakfire_httpclient_xfer_free(struct pakfire_xfer_element* x) { - if (x->xfer) - pakfire_xfer_unref(x->xfer); + return r; + } - free(x); + return 0; } static void pakfire_httpclient_free(struct pakfire_httpclient* client) { @@ -146,6 +463,10 @@ static void pakfire_httpclient_free(struct pakfire_httpclient* client) { curl_share_cleanup(client->share); if (client->curl) curl_multi_cleanup(client->curl); + if (client->timer) + sd_event_source_unref(client->timer); + if (client->loop) + sd_event_unref(client->loop); if (client->ctx) pakfire_ctx_unref(client->ctx); free(client); @@ -179,6 +500,11 @@ int pakfire_httpclient_create(struct pakfire_httpclient** client, struct pakfire TAILQ_INIT(&c->xfers_queued); TAILQ_INIT(&c->xfers_running); + // Setup event loop + r = pakfire_httpclient_setup_loop(c); + if (r) + goto ERROR; + // Setup cURL r = pakfire_httpclient_setup_curl(c); if (r) @@ -262,49 +588,8 @@ static unsigned int pakfire_httpclient_total_queued_xfers(struct pakfire_httpcli return counter; } -static int pakfire_httpclient_start_transfers(struct pakfire_httpclient* client, - struct pakfire_progress* progress, unsigned int* running_transfers) { - struct pakfire_xfer_element* x = NULL; - int r; - - // Keep running until we have reached our ceiling - while (*running_transfers < client->parallel) { - // We are done if there are no more transfers in the queue - if (TAILQ_EMPTY(&client->xfers_queued)) - break; - - // Fetch the next transfer - x = TAILQ_LAST(&client->xfers_queued, xfers_queued); - TAILQ_REMOVE(&client->xfers_queued, x, nodes); - - // Prepare the xfer - r = pakfire_xfer_prepare(x->xfer, progress, 0); - if (r) - goto ERROR; - - // Add the handle to cURL - r = curl_multi_add_handle(client->curl, pakfire_xfer_handle(x->xfer)); - if (r) { - CTX_ERROR(client->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r)); - goto ERROR; - } - - TAILQ_INSERT_TAIL(&client->xfers_running, x, nodes); - (*running_transfers)++; - } - - return 0; - -ERROR: - pakfire_httpclient_xfer_free(x); - - return r; -} - int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title) { struct pakfire_progress* progress = NULL; - struct pakfire_xfer* xfer = NULL; - unsigned int running_xfers = 0; int progress_flags = PAKFIRE_PROGRESS_SHOW_PERCENTAGE | PAKFIRE_PROGRESS_SHOW_ETA; @@ -312,10 +597,6 @@ int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title) struct pakfire_xfer_element* x = NULL; - CURLMsg* msg = NULL; - int still_running; - int msgs_left = -1; - // Fetch the total downloadsize const size_t downloadsize = pakfire_httpclient_total_downloadsize(client); @@ -349,75 +630,20 @@ int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title) if (r) goto ERROR; + // Make sure that we have up to parallel transfers active + r = pakfire_httpclient_start_transfers(client, progress); + if (r) + goto ERROR; + + // Run the event loop do { - // Make sure that we have up to parallel transfers active - if (!r) { - r = pakfire_httpclient_start_transfers(client, progress, &running_xfers); - if (r) - goto ERROR; - } + r = sd_event_run(client->loop, -1); + if (r < 0) { + CTX_ERROR(client->ctx, "Event loop failed: %s\n", strerror(-r)); - // Run cURL - r = curl_multi_perform(client->curl, &still_running); - if (r) { - CTX_ERROR(client->ctx, "cURL error: %s\n", curl_easy_strerror(r)); goto ERROR; } - - for (;;) { - // Read the next message - msg = curl_multi_info_read(client->curl, &msgs_left); - if (!msg) - break; - - switch (msg->msg) { - case CURLMSG_DONE: - // Update reference to transfer - curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer); - - // Remove the handle - curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer)); - - // Find the matching xfer element - x = pakfire_httpclient_xfer_find_running(client, xfer); - - // Remove the transfer from the running list - if (x) - TAILQ_REMOVE(&client->xfers_running, x, nodes); - running_xfers--; - - // Call the done callback - r = pakfire_xfer_done(xfer, msg->data.result); - switch (-r) { - // If we are asked to try again we will re-queue the transfer - case EAGAIN: - if (x) - TAILQ_INSERT_TAIL(&client->xfers_queued, x, nodes); - break; - - // Otherwise this transfer has finished - default: - if (x) - pakfire_httpclient_xfer_free(x); - if (r) - goto ERROR; - break; - } - - // Reset transfer - xfer = NULL; - break; - - default: - CTX_ERROR(client->ctx, "Received unhandled cURL message %u\n", msg->msg); - break; - } - } - - // Wait a little before going through the loop again - if (still_running) - curl_multi_wait(client->curl, NULL, 0, 100, NULL); - } while (still_running || !TAILQ_EMPTY(&client->xfers_queued)); + } while (client->still_running > 0); // We are finished! r = pakfire_progress_finish(progress);