#include <curl/curl.h>
+#include <systemd/sd-event.h>
+
#include <pakfire/httpclient.h>
#include <pakfire/logging.h>
#include <pakfire/progress.h>
struct pakfire_ctx* ctx;
int nrefs;
- unsigned int parallel;
+ // Event Loop
+ sd_event* loop;
+ int still_running;
+
+ // Timer
+ sd_event_source* timer;
+
+ int parallel;
// cURL share handle
CURLSH* share;
TAILQ_HEAD(xfers_running, pakfire_xfer_element) xfers_running;
};
+static struct pakfire_xfer_element* pakfire_httpclient_xfer_create(struct pakfire_xfer* xfer) {
+ struct pakfire_xfer_element* x = NULL;
+
+ // Allocate a new element
+ x = calloc(1, sizeof(*x));
+ if (!x)
+ return NULL;
+
+ // Store a reference to the xfer
+ x->xfer = pakfire_xfer_ref(xfer);
+
+ return x;
+}
+
+static void pakfire_httpclient_xfer_free(struct pakfire_xfer_element* x) {
+ if (x->xfer)
+ pakfire_xfer_unref(x->xfer);
+
+ free(x);
+}
+
+static int pakfire_httpclient_start_transfers(
+ struct pakfire_httpclient* client, struct pakfire_progress* progress) {
+ struct pakfire_xfer_element* x = NULL;
+ int r;
+
+ // Keep running until we have reached our ceiling
+ while (client->still_running < client->parallel) {
+ // We are done if there are no more transfers in the queue
+ if (TAILQ_EMPTY(&client->xfers_queued))
+ break;
+
+ // Fetch the next transfer
+ x = TAILQ_LAST(&client->xfers_queued, xfers_queued);
+ TAILQ_REMOVE(&client->xfers_queued, x, nodes);
+
+ // Prepare the xfer
+ r = pakfire_xfer_prepare(x->xfer, progress, 0);
+ if (r)
+ goto ERROR;
+
+ // Add the handle to cURL
+ r = curl_multi_add_handle(client->curl, pakfire_xfer_handle(x->xfer));
+ if (r) {
+ CTX_ERROR(client->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r));
+ goto ERROR;
+ }
+
+ TAILQ_INSERT_TAIL(&client->xfers_running, x, nodes);
+ }
+
+ return 0;
+
+ERROR:
+ pakfire_httpclient_xfer_free(x);
+
+ return r;
+}
+
+static struct pakfire_xfer_element* pakfire_httpclient_xfer_find_running(
+ struct pakfire_httpclient* client, struct pakfire_xfer* xfer) {
+ struct pakfire_xfer_element* x = NULL;
+
+ TAILQ_FOREACH(x, &client->xfers_running, nodes) {
+ if (x->xfer == xfer)
+ return x;
+ }
+
+ return NULL;
+}
+
+static int pakfire_httpclient_check(struct pakfire_httpclient* client) {
+ struct pakfire_xfer_element* x = NULL;
+ struct pakfire_xfer* xfer = NULL;
+ int r;
+
+ CURLMsg* msg = NULL;
+ int msgs_left = 0;
+
+ for (;;) {
+ // Read the next message
+ msg = curl_multi_info_read(client->curl, &msgs_left);
+ if (!msg)
+ break;
+
+ switch (msg->msg) {
+ case CURLMSG_DONE:
+ // Update reference to transfer
+ curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer);
+
+ // Remove the handle
+ curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer));
+
+ // Find the matching xfer element
+ x = pakfire_httpclient_xfer_find_running(client, xfer);
+
+ // Remove the transfer from the running list
+ if (x)
+ TAILQ_REMOVE(&client->xfers_running, x, nodes);
+
+ // Call the done callback
+ r = pakfire_xfer_done(xfer, msg->data.result);
+ switch (-r) {
+ // If we are asked to try again we will re-queue the transfer
+ case EAGAIN:
+ if (x)
+ TAILQ_INSERT_TAIL(&client->xfers_queued, x, nodes);
+ break;
+
+ // Otherwise this transfer has finished
+ default:
+ if (x)
+ pakfire_httpclient_xfer_free(x);
+ if (r)
+ return r;
+ break;
+ }
+
+ // Reset transfer
+ xfer = NULL;
+ break;
+
+ default:
+ CTX_ERROR(client->ctx, "Received unhandled cURL message %u\n", msg->msg);
+ break;
+ }
+ }
+
+ return 0;
+}
+
+static int __pakfire_httpclient_timer(sd_event_source* s, uint64_t usec, void* data) {
+ struct pakfire_httpclient* client = data;
+ int r;
+
+ CTX_DEBUG(client->ctx, "cURL timer fired\n");
+
+ r = curl_multi_socket_action(client->curl, CURL_SOCKET_TIMEOUT, 0, &client->still_running);
+ if (r)
+ return r;
+
+ // Check for any messages
+ r = pakfire_httpclient_check(client);
+ if (r)
+ return r;
+
+ return 0;
+}
+
+static int pakfire_httpclient_timer(CURLM* multi, long timeout_ms, void* data) {
+ struct pakfire_httpclient* client = data;
+ int r;
+
+ // A negative value indicates that we should remove the timer
+ if (timeout_ms < 0) {
+ if (client->timer) {
+ r = sd_event_source_set_enabled(client->timer, SD_EVENT_OFF);
+ if (r < 0)
+ CTX_ERROR(client->ctx, "Could not disarm the timer: %s\n", strerror(-r));
+
+ return 0;
+ }
+ }
+
+ // Set the timer
+ r = sd_event_source_set_time_relative(client->timer, timeout_ms * 1000);
+ if (r < 0) {
+ CTX_ERROR(client->ctx, "Could not set timer: %s\n", strerror(-r));
+
+ return r;
+ }
+
+ // Have the timer fire once
+ r = sd_event_source_set_enabled(client->timer, SD_EVENT_ONESHOT);
+ if (r < 0) {
+ CTX_ERROR(client->ctx, "Could not enable the timer: %s\n", strerror(-r));
+
+ return r;
+ }
+
+ CTX_DEBUG(client->ctx, "cURL set a timer for %ldms\n", timeout_ms);
+
+ return 0;
+}
+
+static int __pakfire_httpclient_socket(sd_event_source* s, int fd, uint32_t events, void* data) {
+ struct pakfire_httpclient* client = data;
+ int r;
+
+ int action = 0;
+
+ if (events & EPOLLIN)
+ action |= CURL_CSELECT_IN;
+
+ if (events & EPOLLOUT)
+ action |= CURL_CSELECT_OUT;
+
+ //CTX_DEBUG(client->ctx, "cURL has activity on socket %d\n", fd);
+
+ // Inform cURL about some socket activity
+ r = curl_multi_socket_action(client->curl, fd, action, &client->still_running);
+ if (r)
+ return r;
+
+ // Check for any messages
+ r = pakfire_httpclient_check(client);
+ if (r)
+ return r;
+
+ // Disarm the timer
+ if (client->still_running <= 0) {
+ if (client->timer) {
+ r = sd_event_source_set_enabled(client->timer, SD_EVENT_OFF);
+ if (r < 0) {
+ CTX_ERROR(client->ctx, "Could not disarm the timer: %s\n", strerror(-r));
+
+ return r;
+ }
+ }
+ }
+
+ return 0;
+}
+
+static int pakfire_httpclient_socket(CURL* e, curl_socket_t fd, int what, void* data, void* data2) {
+ struct pakfire_httpclient* client = data;
+ sd_event_source* s = data2;
+ uint32_t events = 0;
+ int r;
+
+ // Remove the socket?
+ if (what == CURL_POLL_REMOVE) {
+ // Disable the event
+ r = sd_event_source_set_enabled(s, SD_EVENT_OFF);
+ if (r < 0)
+ CTX_ERROR(client->ctx, "Could not disable fd %d: %s\n", fd, strerror(-r));
+
+ CTX_DEBUG(client->ctx, "cURL deregistered socket %d\n", fd);
+
+ return r;
+ }
+
+ // Do we need to read from this socket?
+ if (what & CURL_POLL_IN)
+ events |= EPOLLIN;
+
+ // Do we need to write to this socket?
+ if (what & CURL_POLL_OUT)
+ events |= EPOLLOUT;
+
+ // Change events?
+ if (s) {
+ r = sd_event_source_set_io_events(s, events);
+ if (r < 0) {
+ CTX_ERROR(client->ctx, "Could not change events for socket %d: %s\n",
+ fd, strerror(-r));
+
+ return r;
+ }
+
+ CTX_DEBUG(client->ctx, "cURL changed socket %d\n", fd);
+
+ return 0;
+ }
+
+ // Add the socket to the event loop
+ r = sd_event_add_io(client->loop, &s, fd, events, __pakfire_httpclient_socket, client);
+ if (r < 0) {
+ CTX_ERROR(client->ctx, "Could not register socket %d: %s\n", fd, strerror(-r));
+
+ goto ERROR;
+ }
+
+ // Store the event source
+ curl_multi_assign(client->curl, fd, sd_event_source_ref(s));
+
+ CTX_DEBUG(client->ctx, "cURL registered socket %d\n", fd);
+
+ERROR:
+ if (s)
+ sd_event_source_unref(s);
+
+ return r;
+}
+
+static int pakfire_httpclient_setup_loop(struct pakfire_httpclient* client) {
+ int r;
+
+ // Create a new event loop
+ r = sd_event_new(&client->loop);
+ if (r < 0) {
+ CTX_ERROR(client->ctx, "Could not setup event loop: %s\n", strerror(-r));
+
+ return r;
+ }
+
+ // Create a new timer
+ r = sd_event_add_time_relative(client->loop, &client->timer, CLOCK_MONOTONIC, 0, 0,
+ __pakfire_httpclient_timer, client);
+ if (r < 0) {
+ CTX_ERROR(client->ctx, "Could not set timer: %s\n", strerror(-r));
+
+ return r;
+ }
+
+ return 0;
+}
+
static int pakfire_httpclient_setup_curl(struct pakfire_httpclient* client) {
int r;
return r;
}
- return 0;
-}
-
-static struct pakfire_xfer_element* pakfire_httpclient_xfer_create(struct pakfire_xfer* xfer) {
- struct pakfire_xfer_element* x = NULL;
+ // Register with the event loop
+ r = curl_multi_setopt(client->curl, CURLMOPT_TIMERFUNCTION, pakfire_httpclient_timer);
+ if (r) {
+ CTX_ERROR(client->ctx, "Could not register the timer function: %s\n",
+ curl_multi_strerror(r));
- // Allocate a new element
- x = calloc(1, sizeof(*x));
- if (!x)
- return NULL;
+ return r;
+ }
- // Store a reference to the xfer
- x->xfer = pakfire_xfer_ref(xfer);
+ r = curl_multi_setopt(client->curl, CURLMOPT_TIMERDATA, client);
+ if (r) {
+ CTX_ERROR(client->ctx, "Could not register the timer data: %s\n",
+ curl_multi_strerror(r));
- return x;
-}
+ return r;
+ }
-static struct pakfire_xfer_element* pakfire_httpclient_xfer_find_running(
- struct pakfire_httpclient* client, struct pakfire_xfer* xfer) {
- struct pakfire_xfer_element* x = NULL;
+ r = curl_multi_setopt(client->curl, CURLMOPT_SOCKETFUNCTION, pakfire_httpclient_socket);
+ if (r) {
+ CTX_ERROR(client->ctx, "Could not register the socket function: %s\n",
+ curl_multi_strerror(r));
- TAILQ_FOREACH(x, &client->xfers_running, nodes) {
- if (x->xfer == xfer)
- return x;
+ return r;
}
- return NULL;
-}
+ r = curl_multi_setopt(client->curl, CURLMOPT_SOCKETDATA, client);
+ if (r) {
+ CTX_ERROR(client->ctx, "Could not register the socket data: %s\n",
+ curl_multi_strerror(r));
-static void pakfire_httpclient_xfer_free(struct pakfire_xfer_element* x) {
- if (x->xfer)
- pakfire_xfer_unref(x->xfer);
+ return r;
+ }
- free(x);
+ return 0;
}
static void pakfire_httpclient_free(struct pakfire_httpclient* client) {
curl_share_cleanup(client->share);
if (client->curl)
curl_multi_cleanup(client->curl);
+ if (client->timer)
+ sd_event_source_unref(client->timer);
+ if (client->loop)
+ sd_event_unref(client->loop);
if (client->ctx)
pakfire_ctx_unref(client->ctx);
free(client);
TAILQ_INIT(&c->xfers_queued);
TAILQ_INIT(&c->xfers_running);
+ // Setup event loop
+ r = pakfire_httpclient_setup_loop(c);
+ if (r)
+ goto ERROR;
+
// Setup cURL
r = pakfire_httpclient_setup_curl(c);
if (r)
return counter;
}
-static int pakfire_httpclient_start_transfers(struct pakfire_httpclient* client,
- struct pakfire_progress* progress, unsigned int* running_transfers) {
- struct pakfire_xfer_element* x = NULL;
- int r;
-
- // Keep running until we have reached our ceiling
- while (*running_transfers < client->parallel) {
- // We are done if there are no more transfers in the queue
- if (TAILQ_EMPTY(&client->xfers_queued))
- break;
-
- // Fetch the next transfer
- x = TAILQ_LAST(&client->xfers_queued, xfers_queued);
- TAILQ_REMOVE(&client->xfers_queued, x, nodes);
-
- // Prepare the xfer
- r = pakfire_xfer_prepare(x->xfer, progress, 0);
- if (r)
- goto ERROR;
-
- // Add the handle to cURL
- r = curl_multi_add_handle(client->curl, pakfire_xfer_handle(x->xfer));
- if (r) {
- CTX_ERROR(client->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r));
- goto ERROR;
- }
-
- TAILQ_INSERT_TAIL(&client->xfers_running, x, nodes);
- (*running_transfers)++;
- }
-
- return 0;
-
-ERROR:
- pakfire_httpclient_xfer_free(x);
-
- return r;
-}
-
int pakfire_httpclient_run(struct pakfire_httpclient* client, const char* title) {
struct pakfire_progress* progress = NULL;
- struct pakfire_xfer* xfer = NULL;
- unsigned int running_xfers = 0;
int progress_flags =
PAKFIRE_PROGRESS_SHOW_PERCENTAGE |
PAKFIRE_PROGRESS_SHOW_ETA;
struct pakfire_xfer_element* x = NULL;
- CURLMsg* msg = NULL;
- int still_running;
- int msgs_left = -1;
-
// Fetch the total downloadsize
const size_t downloadsize = pakfire_httpclient_total_downloadsize(client);
if (r)
goto ERROR;
+ // Make sure that we have up to parallel transfers active
+ r = pakfire_httpclient_start_transfers(client, progress);
+ if (r)
+ goto ERROR;
+
+ // Run the event loop
do {
- // Make sure that we have up to parallel transfers active
- if (!r) {
- r = pakfire_httpclient_start_transfers(client, progress, &running_xfers);
- if (r)
- goto ERROR;
- }
+ r = sd_event_run(client->loop, -1);
+ if (r < 0) {
+ CTX_ERROR(client->ctx, "Event loop failed: %s\n", strerror(-r));
- // Run cURL
- r = curl_multi_perform(client->curl, &still_running);
- if (r) {
- CTX_ERROR(client->ctx, "cURL error: %s\n", curl_easy_strerror(r));
goto ERROR;
}
-
- for (;;) {
- // Read the next message
- msg = curl_multi_info_read(client->curl, &msgs_left);
- if (!msg)
- break;
-
- switch (msg->msg) {
- case CURLMSG_DONE:
- // Update reference to transfer
- curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer);
-
- // Remove the handle
- curl_multi_remove_handle(client->curl, pakfire_xfer_handle(xfer));
-
- // Find the matching xfer element
- x = pakfire_httpclient_xfer_find_running(client, xfer);
-
- // Remove the transfer from the running list
- if (x)
- TAILQ_REMOVE(&client->xfers_running, x, nodes);
- running_xfers--;
-
- // Call the done callback
- r = pakfire_xfer_done(xfer, msg->data.result);
- switch (-r) {
- // If we are asked to try again we will re-queue the transfer
- case EAGAIN:
- if (x)
- TAILQ_INSERT_TAIL(&client->xfers_queued, x, nodes);
- break;
-
- // Otherwise this transfer has finished
- default:
- if (x)
- pakfire_httpclient_xfer_free(x);
- if (r)
- goto ERROR;
- break;
- }
-
- // Reset transfer
- xfer = NULL;
- break;
-
- default:
- CTX_ERROR(client->ctx, "Received unhandled cURL message %u\n", msg->msg);
- break;
- }
- }
-
- // Wait a little before going through the loop again
- if (still_running)
- curl_multi_wait(client->curl, NULL, 0, 100, NULL);
- } while (still_running || !TAILQ_EMPTY(&client->xfers_queued));
+ } while (client->still_running > 0);
// We are finished!
r = pakfire_progress_finish(progress);