]> git.ipfire.org Git - pakfire.git/commitdiff
httpclient: Implement parallel transfers with sd-events
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 10 Aug 2024 17:33:41 +0000 (17:33 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 10 Aug 2024 17:33:41 +0000 (17:33 +0000)
This patch will create a new event loop for each HTTP client which will
be used to connect to cURL.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/libpakfire/httpclient.c

index 69d795361dcdb7f356bddc540e986277861433fc..4ec33229a4f2c16516e5e9de13dda08390ecf1cc 100644 (file)
@@ -24,6 +24,8 @@
 
 #include <curl/curl.h>
 
+#include <systemd/sd-event.h>
+
 #include <pakfire/httpclient.h>
 #include <pakfire/logging.h>
 #include <pakfire/progress.h>
@@ -42,7 +44,14 @@ struct pakfire_httpclient {
        struct pakfire_ctx* ctx;
        int nrefs;
 
-       unsigned int parallel;
+       // Event Loop
+       sd_event* loop;
+       int still_running;
+
+       // Timer
+       sd_event_source* timer;
+
+       int parallel;
 
        // cURL share handle
        CURLSH* share;
@@ -55,6 +64,314 @@ struct pakfire_httpclient {
        TAILQ_HEAD(xfers_running, pakfire_xfer_element) xfers_running;
 };
 
+static struct pakfire_xfer_element* pakfire_httpclient_xfer_create(struct pakfire_xfer* xfer) {
+       struct pakfire_xfer_element* x = NULL;
+
+       // Allocate a new element
+       x = calloc(1, sizeof(*x));
+       if (!x)
+               return NULL;
+
+       // Store a reference to the xfer
+       x->xfer = pakfire_xfer_ref(xfer);
+
+       return x;
+}
+
+static void pakfire_httpclient_xfer_free(struct pakfire_xfer_element* x) {
+       if (x->xfer)
+               pakfire_xfer_unref(x->xfer);
+
+       free(x);
+}
+
+static int pakfire_httpclient_start_transfers(
+               struct pakfire_httpclient* client, struct pakfire_progress* progress) {
+       struct pakfire_xfer_element* x = NULL;
+       int r;
+
+       // Keep running until we have reached our ceiling
+       while (client->still_running < client->parallel) {
+               // We are done if there are no more transfers in the queue
+               if (TAILQ_EMPTY(&client->xfers_queued))
+                       break;
+
+               // Fetch the next transfer
+               x = TAILQ_LAST(&client->xfers_queued, xfers_queued);
+               TAILQ_REMOVE(&client->xfers_queued, x, nodes);
+
+               // Prepare the xfer
+               r = pakfire_xfer_prepare(x->xfer, progress, 0);
+               if (r)
+                       goto ERROR;
+
+               // Add the handle to cURL
+               r = curl_multi_add_handle(client->curl, pakfire_xfer_handle(x->xfer));
+               if (r) {
+                       CTX_ERROR(client->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r));
+                       goto ERROR;
+               }
+
+               TAILQ_INSERT_TAIL(&client->xfers_running, x, nodes);
+       }
+
+       return 0;
+
+ERROR:
+       pakfire_httpclient_xfer_free(x);
+
+       return r;
+}
+
+static struct pakfire_xfer_element* pakfire_httpclient_xfer_find_running(
+               struct pakfire_httpclient* client, struct pakfire_xfer* xfer) {
+       struct pakfire_xfer_element* x = NULL;
+
+       TAILQ_FOREACH(x, &client->xfers_running, nodes) {
+               if (x->xfer == xfer)
+                       return x;
+       }
+
+       return NULL;
+}
+
+static int pakfire_httpclient_check(struct pakfire_httpclient* client) {
+       struct pakfire_xfer_element* x = NULL;
+       struct pakfire_xfer* xfer = NULL;
+       int r;
+
+       CURLMsg* msg = NULL;
+       int msgs_left = 0;
+
+       for (;;) {
+               // Read the next message
+               msg = curl_multi_info_read(client->curl, &msgs_left);
+               if (!msg)
+                       break;
+
+               switch (msg->msg) {
+                       case CURLMSG_DONE:
+                               // Update reference to transfer
+                               curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer);
+
+                               // Remove the handle
+                               curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer));
+
+                               // Find the matching xfer element
+                               x = pakfire_httpclient_xfer_find_running(client, xfer);
+
+                               // Remove the transfer from the running list
+                               if (x)
+                                       TAILQ_REMOVE(&client->xfers_running, x, nodes);
+
+                               // Call the done callback
+                               r = pakfire_xfer_done(xfer, msg->data.result);
+                               switch (-r) {
+                                       // If we are asked to try again we will re-queue the transfer
+                                       case EAGAIN:
+                                               if (x)
+                                                       TAILQ_INSERT_TAIL(&client->xfers_queued, x, nodes);
+                                               break;
+
+                                       // Otherwise this transfer has finished
+                                       default:
+                                               if (x)
+                                                       pakfire_httpclient_xfer_free(x);
+                                               if (r)
+                                                       return r;
+                                               break;
+                               }
+
+                               // Reset transfer
+                               xfer = NULL;
+                               break;
+
+                       default:
+                               CTX_ERROR(client->ctx, "Received unhandled cURL message %u\n", msg->msg);
+                               break;
+               }
+       }
+
+       return 0;
+}
+
+static int __pakfire_httpclient_timer(sd_event_source* s, uint64_t usec, void* data) {
+       struct pakfire_httpclient* client = data;
+       int r;
+
+       CTX_DEBUG(client->ctx, "cURL timer fired\n");
+
+       r = curl_multi_socket_action(client->curl, CURL_SOCKET_TIMEOUT, 0, &client->still_running);
+       if (r)
+               return r;
+
+       // Check for any messages
+       r = pakfire_httpclient_check(client);
+       if (r)
+               return r;
+
+       return 0;
+}
+
+static int pakfire_httpclient_timer(CURLM* multi, long timeout_ms, void* data) {
+       struct pakfire_httpclient* client = data;
+       int r;
+
+       // A negative value indicates that we should remove the timer
+       if (timeout_ms < 0) {
+               if (client->timer) {
+                       r = sd_event_source_set_enabled(client->timer, SD_EVENT_OFF);
+                       if (r < 0)
+                               CTX_ERROR(client->ctx, "Could not disarm the timer: %s\n", strerror(-r));
+
+                       return 0;
+               }
+       }
+
+       // Set the timer
+       r = sd_event_source_set_time_relative(client->timer, timeout_ms * 1000);
+       if (r < 0) {
+               CTX_ERROR(client->ctx, "Could not set timer: %s\n", strerror(-r));
+
+               return r;
+       }
+
+       // Have the timer fire once
+       r = sd_event_source_set_enabled(client->timer, SD_EVENT_ONESHOT);
+       if (r < 0) {
+               CTX_ERROR(client->ctx, "Could not enable the timer: %s\n", strerror(-r));
+
+               return r;
+       }
+
+       CTX_DEBUG(client->ctx, "cURL set a timer for %ldms\n", timeout_ms);
+
+       return 0;
+}
+
+static int __pakfire_httpclient_socket(sd_event_source* s, int fd, uint32_t events, void* data) {
+       struct pakfire_httpclient* client = data;
+       int r;
+
+       int action = 0;
+
+       if (events & EPOLLIN)
+               action |= CURL_CSELECT_IN;
+
+       if (events & EPOLLOUT)
+               action |= CURL_CSELECT_OUT;
+
+       //CTX_DEBUG(client->ctx, "cURL has activity on socket %d\n", fd);
+
+       // Inform cURL about some socket activity
+       r = curl_multi_socket_action(client->curl, fd, action, &client->still_running);
+       if (r)
+               return r;
+
+       // Check for any messages
+       r = pakfire_httpclient_check(client);
+       if (r)
+               return r;
+
+       // Disarm the timer
+       if (client->still_running <= 0) {
+               if (client->timer) {
+                       r = sd_event_source_set_enabled(client->timer, SD_EVENT_OFF);
+                       if (r < 0) {
+                               CTX_ERROR(client->ctx, "Could not disarm the timer: %s\n", strerror(-r));
+
+                               return r;
+                       }
+               }
+       }
+
+       return 0;
+}
+
+static int pakfire_httpclient_socket(CURL* e, curl_socket_t fd, int what, void* data, void* data2) {
+       struct pakfire_httpclient* client = data;
+       sd_event_source* s = data2;
+       uint32_t events = 0;
+       int r;
+
+       // Remove the socket?
+       if (what == CURL_POLL_REMOVE) {
+               // Disable the event
+               r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
+               if (r < 0)
+                       CTX_ERROR(client->ctx, "Could not disable fd %d: %s\n", fd, strerror(-r));
+
+               CTX_DEBUG(client->ctx, "cURL deregistered socket %d\n", fd);
+
+               return r;
+       }
+
+       // Do we need to read from this socket?
+       if (what & CURL_POLL_IN)
+               events |= EPOLLIN;
+
+       // Do we need to write to this socket?
+       if (what & CURL_POLL_OUT)
+               events |= EPOLLOUT;
+
+       // Change events?
+       if (s) {
+               r = sd_event_source_set_io_events(s, events);
+               if (r < 0) {
+                       CTX_ERROR(client->ctx, "Could not change events for socket %d: %s\n",
+                               fd, strerror(-r));
+
+                       return r;
+               }
+
+               CTX_DEBUG(client->ctx, "cURL changed socket %d\n", fd);
+
+               return 0;
+       }
+
+       // Add the socket to the event loop
+       r = sd_event_add_io(client->loop, &s, fd, events, __pakfire_httpclient_socket, client);
+       if (r < 0) {
+               CTX_ERROR(client->ctx, "Could not register socket %d: %s\n", fd, strerror(-r));
+
+               goto ERROR;
+       }
+
+       // Store the event source
+       curl_multi_assign(client->curl, fd, sd_event_source_ref(s));
+
+       CTX_DEBUG(client->ctx, "cURL registered socket %d\n", fd);
+
+ERROR:
+       if (s)
+               sd_event_source_unref(s);
+
+       return r;
+}
+
+static int pakfire_httpclient_setup_loop(struct pakfire_httpclient* client) {
+       int r;
+
+       // Create a new event loop
+       r = sd_event_new(&client->loop);
+       if (r < 0) {
+               CTX_ERROR(client->ctx, "Could not setup event loop: %s\n", strerror(-r));
+
+               return r;
+       }
+
+       // Create a new timer
+       r = sd_event_add_time_relative(client->loop, &client->timer, CLOCK_MONOTONIC, 0, 0,
+               __pakfire_httpclient_timer, client);
+       if (r < 0) {
+               CTX_ERROR(client->ctx, "Could not set timer: %s\n", strerror(-r));
+
+               return r;
+       }
+
+       return 0;
+}
+
 static int pakfire_httpclient_setup_curl(struct pakfire_httpclient* client) {
        int r;
 
@@ -87,40 +404,40 @@ static int pakfire_httpclient_setup_curl(struct pakfire_httpclient* client) {
                return r;
        }
 
-       return 0;
-}
-
-static struct pakfire_xfer_element* pakfire_httpclient_xfer_create(struct pakfire_xfer* xfer) {
-       struct pakfire_xfer_element* x = NULL;
+       // Register with the event loop
+       r = curl_multi_setopt(client->curl, CURLMOPT_TIMERFUNCTION, pakfire_httpclient_timer);
+       if (r) {
+               CTX_ERROR(client->ctx, "Could not register the timer function: %s\n",
+                       curl_multi_strerror(r));
 
-       // Allocate a new element
-       x = calloc(1, sizeof(*x));
-       if (!x)
-               return NULL;
+               return r;
+       }
 
-       // Store a reference to the xfer
-       x->xfer = pakfire_xfer_ref(xfer);
+       r = curl_multi_setopt(client->curl, CURLMOPT_TIMERDATA, client);
+       if (r) {
+               CTX_ERROR(client->ctx, "Could not register the timer data: %s\n",
+                       curl_multi_strerror(r));
 
-       return x;
-}
+               return r;
+       }
 
-static struct pakfire_xfer_element* pakfire_httpclient_xfer_find_running(
-               struct pakfire_httpclient* client, struct pakfire_xfer* xfer) {
-       struct pakfire_xfer_element* x = NULL;
+       r = curl_multi_setopt(client->curl, CURLMOPT_SOCKETFUNCTION, pakfire_httpclient_socket);
+       if (r) {
+               CTX_ERROR(client->ctx, "Could not register the socket function: %s\n",
+                       curl_multi_strerror(r));
 
-       TAILQ_FOREACH(x, &client->xfers_running, nodes) {
-               if (x->xfer == xfer)
-                       return x;
+               return r;
        }
 
-       return NULL;
-}
+       r = curl_multi_setopt(client->curl, CURLMOPT_SOCKETDATA, client);
+       if (r) {
+               CTX_ERROR(client->ctx, "Could not register the socket data: %s\n",
+                       curl_multi_strerror(r));
 
-static void pakfire_httpclient_xfer_free(struct pakfire_xfer_element* x) {
-       if (x->xfer)
-               pakfire_xfer_unref(x->xfer);
+               return r;
+       }
 
-       free(x);
+       return 0;
 }
 
 static void pakfire_httpclient_free(struct pakfire_httpclient* client) {
@@ -146,6 +463,10 @@ static void pakfire_httpclient_free(struct pakfire_httpclient* client) {
                curl_share_cleanup(client->share);
        if (client->curl)
                curl_multi_cleanup(client->curl);
+       if (client->timer)
+               sd_event_source_unref(client->timer);
+       if (client->loop)
+               sd_event_unref(client->loop);
        if (client->ctx)
                pakfire_ctx_unref(client->ctx);
        free(client);
@@ -179,6 +500,11 @@ int pakfire_httpclient_create(struct pakfire_httpclient** client, struct pakfire
        TAILQ_INIT(&c->xfers_queued);
        TAILQ_INIT(&c->xfers_running);
 
+       // Setup event loop
+       r = pakfire_httpclient_setup_loop(c);
+       if (r)
+               goto ERROR;
+
        // Setup cURL
        r = pakfire_httpclient_setup_curl(c);
        if (r)
@@ -262,49 +588,8 @@ static unsigned int pakfire_httpclient_total_queued_xfers(struct pakfire_httpcli
        return counter;
 }
 
-static int pakfire_httpclient_start_transfers(struct pakfire_httpclient* client,
-               struct pakfire_progress* progress, unsigned int* running_transfers) {
-       struct pakfire_xfer_element* x = NULL;
-       int r;
-
-       // Keep running until we have reached our ceiling
-       while (*running_transfers < client->parallel) {
-               // We are done if there are no more transfers in the queue
-               if (TAILQ_EMPTY(&client->xfers_queued))
-                       break;
-
-               // Fetch the next transfer
-               x = TAILQ_LAST(&client->xfers_queued, xfers_queued);
-               TAILQ_REMOVE(&client->xfers_queued, x, nodes);
-
-               // Prepare the xfer
-               r = pakfire_xfer_prepare(x->xfer, progress, 0);
-               if (r)
-                       goto ERROR;
-
-               // Add the handle to cURL
-               r = curl_multi_add_handle(client->curl, pakfire_xfer_handle(x->xfer));
-               if (r) {
-                       CTX_ERROR(client->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r));
-                       goto ERROR;
-               }
-
-               TAILQ_INSERT_TAIL(&client->xfers_running, x, nodes);
-               (*running_transfers)++;
-       }
-
-       return 0;
-
-ERROR:
-       pakfire_httpclient_xfer_free(x);
-
-       return r;
-}
-
 int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title) {
        struct pakfire_progress* progress = NULL;
-       struct pakfire_xfer* xfer = NULL;
-       unsigned int running_xfers = 0;
        int progress_flags =
                PAKFIRE_PROGRESS_SHOW_PERCENTAGE |
                PAKFIRE_PROGRESS_SHOW_ETA;
@@ -312,10 +597,6 @@ int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title)
 
        struct pakfire_xfer_element* x = NULL;
 
-       CURLMsg* msg = NULL;
-       int still_running;
-       int msgs_left = -1;
-
        // Fetch the total downloadsize
        const size_t downloadsize = pakfire_httpclient_total_downloadsize(client);
 
@@ -349,75 +630,20 @@ int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title)
        if (r)
                goto ERROR;
 
+       // Make sure that we have up to parallel transfers active
+       r = pakfire_httpclient_start_transfers(client, progress);
+       if (r)
+               goto ERROR;
+
+       // Run the event loop
        do {
-               // Make sure that we have up to parallel transfers active
-               if (!r) {
-                       r = pakfire_httpclient_start_transfers(client, progress, &running_xfers);
-                       if (r)
-                               goto ERROR;
-               }
+               r = sd_event_run(client->loop, -1);
+               if (r < 0) {
+                       CTX_ERROR(client->ctx, "Event loop failed: %s\n", strerror(-r));
 
-               // Run cURL
-               r = curl_multi_perform(client->curl, &still_running);
-               if (r) {
-                       CTX_ERROR(client->ctx, "cURL error: %s\n", curl_easy_strerror(r));
                        goto ERROR;
                }
-
-               for (;;) {
-                       // Read the next message
-                       msg = curl_multi_info_read(client->curl, &msgs_left);
-                       if (!msg)
-                               break;
-
-                       switch (msg->msg) {
-                               case CURLMSG_DONE:
-                                       // Update reference to transfer
-                                       curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer);
-
-                                       // Remove the handle
-                                       curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer));
-
-                                       // Find the matching xfer element
-                                       x = pakfire_httpclient_xfer_find_running(client, xfer);
-
-                                       // Remove the transfer from the running list
-                                       if (x)
-                                               TAILQ_REMOVE(&client->xfers_running, x, nodes);
-                                       running_xfers--;
-
-                                       // Call the done callback
-                                       r = pakfire_xfer_done(xfer, msg->data.result);
-                                       switch (-r) {
-                                               // If we are asked to try again we will re-queue the transfer
-                                               case EAGAIN:
-                                                       if (x)
-                                                               TAILQ_INSERT_TAIL(&client->xfers_queued, x, nodes);
-                                                       break;
-
-                                               // Otherwise this transfer has finished
-                                               default:
-                                                       if (x)
-                                                               pakfire_httpclient_xfer_free(x);
-                                                       if (r)
-                                                               goto ERROR;
-                                                       break;
-                                       }
-
-                                       // Reset transfer
-                                       xfer = NULL;
-                                       break;
-
-                               default:
-                                       CTX_ERROR(client->ctx, "Received unhandled cURL message %u\n", msg->msg);
-                                       break;
-                       }
-               }
-
-               // Wait a little before going through the loop again
-               if (still_running)
-                       curl_multi_wait(client->curl, NULL, 0, 100, NULL);
-       } while (still_running || !TAILQ_EMPTY(&client->xfers_queued));
+       } while (client->still_running > 0);
 
        // We are finished!
        r = pakfire_progress_finish(progress);