#include <pakfire/progress.h>
#include <pakfire/string.h>
#include <pakfire/xfer.h>
+#include <pakfire/util.h>
// The number of concurrent downloads
#define DEFAULT_MAX_PARALLEL 4
TAILQ_ENTRY(pakfire_httpclient_xfer) nodes;
struct pakfire_xfer* xfer;
+
+ // Store the status
+ enum pakfire_httpclient_xfer_status {
+ PAKFIRE_XFER_QUEUED,
+ PAKFIRE_XFER_RUNNING,
+ PAKFIRE_XFER_FINISHED,
+ } status;
};
struct pakfire_httpclient {
struct pakfire_progress* progress;
// How many transfers in parallel?
- long max_parallel;
+ long unsigned max_parallel;
// cURL multi handle
CURLM* curl;
// Store a reference to the xfer
e->xfer = pakfire_xfer_ref(xfer);
+ // Start as queued
+ e->status = PAKFIRE_XFER_QUEUED;
+
// Return the pointer
*x = e;
free(x);
}
+static size_t pakfire_httpclient_count_xfers(
+ struct pakfire_httpclient* self, const enum pakfire_httpclient_xfer_status status) {
+ struct pakfire_httpclient_xfer* e = NULL;
+ size_t counter = 0;
+
+ TAILQ_FOREACH(e, &self->xfers, nodes) {
+ if (e->status == status)
+ counter++;
+ }
+
+ return counter;
+}
+
+static size_t pakfire_httpclient_num_running_xfers(struct pakfire_httpclient* self) {
+ return pakfire_httpclient_count_xfers(self, PAKFIRE_XFER_RUNNING);
+}
+
+static size_t pakfire_httpclient_num_queued_xfers(struct pakfire_httpclient* self) {
+ return pakfire_httpclient_count_xfers(self, PAKFIRE_XFER_QUEUED);
+}
+
+/*
+ Returns the next queued xfer or NULL
+*/
+static struct pakfire_httpclient_xfer* pakfire_httpclient_get_queued_xfer(struct pakfire_httpclient* self) {
+ struct pakfire_httpclient_xfer* e = NULL;
+
+ TAILQ_FOREACH(e, &self->xfers, nodes) {
+ if (e->status == PAKFIRE_XFER_QUEUED)
+ return e;
+ }
+
+ return NULL;
+}
+
static struct pakfire_httpclient_xfer* pakfire_httpclient_xfer_find(
struct pakfire_httpclient* self, struct pakfire_xfer* xfer) {
struct pakfire_httpclient_xfer* e = NULL;
return NULL;
}
+
+static int pakfire_httpclient_remove(
+ struct pakfire_httpclient* self, struct pakfire_httpclient_xfer* e) {
+ int r;
+
+ // Nothing to do if not running
+ switch (e->status) {
+ case PAKFIRE_XFER_RUNNING:
+ break;
+
+ case PAKFIRE_XFER_QUEUED:
+ case PAKFIRE_XFER_FINISHED:
+ return 0;
+ }
+
+ // Remove the handle
+ r = curl_multi_remove_handle(self->curl, pakfire_xfer_handle(e->xfer));
+ if (r) {
+ ERROR(self->ctx, "Could not remove the handle: %s\n", curl_multi_strerror(r));
+ return -ENOTSUP;
+ }
+
+ // Mark as finished
+ e->status = PAKFIRE_XFER_FINISHED;
+
+ return 0;
+}
+
+static int pakfire_httpclient_launch_one(
+ struct pakfire_httpclient* self, struct pakfire_httpclient_xfer* e) {
+ int r;
+
+ // Prepare the xfer
+ r = pakfire_xfer_prepare(e->xfer, self->progress, 0);
+ if (r < 0)
+ return r;
+
+ // Add the handle to cURL
+ r = curl_multi_add_handle(self->curl, pakfire_xfer_handle(e->xfer));
+ if (r) {
+ ERROR(self->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r));
+ return -ENOTSUP;
+ }
+
+ // Mark as running
+ e->status = PAKFIRE_XFER_RUNNING;
+
+ return 0;
+}
+
+static int pakfire_httpclient_launch(struct pakfire_httpclient* self) {
+ struct pakfire_httpclient_xfer* e = NULL;
+ int r;
+
+ while (pakfire_httpclient_num_running_xfers(self) < self->max_parallel) {
+ // Fetch the next queued xfer
+ e = pakfire_httpclient_get_queued_xfer(self);
+ if (!e)
+ break;
+
+ // Launch it
+ r = pakfire_httpclient_launch_one(self, e);
+ if (r < 0)
+ return r;
+ }
+
+ return 0;
+}
+
static int pakfire_httpclient_check(struct pakfire_httpclient* self) {
+ struct pakfire_httpclient_xfer* e = NULL;
struct pakfire_xfer* xfer = NULL;
int r;
// Update reference to transfer
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &xfer);
- // Call the done callback
- r = pakfire_xfer_done(xfer, msg->data.result);
- switch (-r) {
- // If we are asked to try again we will re-queue the transfer
- case EAGAIN:
- r = pakfire_httpclient_enqueue_xfer(self, xfer);
- if (r)
- return r;
+ // Find the queue element
+ e = pakfire_httpclient_xfer_find(self, xfer);
+ if (!e) {
+ ERROR(self->ctx, "Could not find queued transfer\n");
+ return -EINVAL;
+ }
- break;
+ // Remove the handle
+ r = pakfire_httpclient_remove(self, e);
+ if (r < 0)
+ return r;
- // Otherwise this transfer has finished
- default:
- if (r)
+ // Call the done callback
+ r = pakfire_xfer_done(xfer, msg->data.result);
+ if (r < 0) {
+ switch (-r) {
+ case EAGAIN:
+ // Launch it again
+ r = pakfire_httpclient_launch_one(self, e);
+ if (r < 0)
+ return r;
+
+ // Move to the next message
+ continue;
+
+ // Raise any errors
+ default:
return r;
-
- break;
+ }
}
+ // Remove the xfer from the queue
+ pakfire_httpclient_dequeue_xfer(self, xfer);
+
// Reset transfer
xfer = NULL;
break;
}
}
+ // Launch some more transfers
+ r = pakfire_httpclient_launch(self);
+ if (r < 0)
+ return r;
+
// If we control the loop, terminate it if there are no more transfers left
if (pakfire_httpclient_has_flag(self, PAKFIRE_HTTPCLIENT_OWN_LOOP)) {
if (self->still_running <= 0)
if (r < 0)
return r;
- // Prepare the xfer
- r = pakfire_xfer_prepare(xfer, self->progress, 0);
- if (r < 0)
- goto ERROR;
-
- // Increment the xfer counter
- self->total_xfers++;
-
// Update the total download size
self->total_downloadsize += pakfire_xfer_get_size(xfer);
- // Add the handle to cURL
- r = curl_multi_add_handle(self->curl, pakfire_xfer_handle(xfer));
- if (r) {
- ERROR(self->ctx, "Adding handle failed: %s\n", curl_multi_strerror(r));
- goto ERROR;
- }
-
// Keep a reference to the xfer
TAILQ_INSERT_TAIL(&self->xfers, e, nodes);
// Transfer enqueued
return pakfire_xfer_enqueued(xfer, self);
-
-ERROR:
- if (e)
- pakfire_httpclient_xfer_free(e);
-
- return r;
}
int pakfire_httpclient_dequeue_xfer(struct pakfire_httpclient* self,
struct pakfire_xfer* xfer) {
struct pakfire_httpclient_xfer* e = NULL;
+ int r;
// Find reference
e = pakfire_httpclient_xfer_find(self, xfer);
if (!e)
return 0;
- // Decrement the xfers counter
- self->total_xfers--;
+ // Remove the transfer
+ r = pakfire_httpclient_remove(self, e);
+ if (r < 0)
+ return r;
// Reduce the total download size
self->total_downloadsize -= pakfire_xfer_get_size(xfer);
return 0;
}
-int pakfire_httpclient_remove_xfer(struct pakfire_httpclient* self,
- struct pakfire_xfer* xfer) {
- int r;
-
- // Remove the handle
- r = curl_multi_remove_handle(self->curl, pakfire_xfer_handle(xfer));
- if (r) {
- ERROR(self->ctx, "Could not remove the handle: %s\n", curl_multi_strerror(r));
- return -ENOTSUP;
- }
-
- return 0;
-}
-
int pakfire_httpclient_run(struct pakfire_httpclient* self, const char* title) {
int r = 0;
// Cannot run without any transfers
- if (!self->total_xfers) {
+ if (TAILQ_EMPTY(&self->xfers)) {
DEBUG(self->ctx, "Skipping running HTTP client without any transfers\n");
return 0;
}
if (r)
goto ERROR;
+ // Launch some transfers
+ r = pakfire_httpclient_launch(self);
+ if (r < 0)
+ goto ERROR;
+
// Run the event loop
r = sd_event_loop(self->loop);
if (r < 0) {
goto ERROR;
}
+ if (pakfire_httpclient_num_running_xfers(self)) {
+ ERROR(self->ctx, "HTTP client ended with running transfers\n");
+ r = -ECANCELED;
+ goto ERROR;
+ }
+
+ if (pakfire_httpclient_num_queued_xfers(self)) {
+ ERROR(self->ctx, "Not all queued items have been downloaded\n");
+ r = -ECANCELED;
+ goto ERROR;
+ }
+
ERROR:
// We are finished!
pakfire_progress_finish(self->progress);
// Remove the cURL handle
if (xfer->client)
- pakfire_httpclient_remove_xfer(xfer->client, xfer);
+ pakfire_httpclient_dequeue_xfer(xfer->client, xfer);
// systemd
if (xfer->event)
if (xfer->progress) {
r = pakfire_progress_finish(xfer->progress);
if (r < 0)
- goto ERROR;
+ return r;
}
// Log the result
// Download Size
r = curl_easy_getinfo(h, CURLINFO_SIZE_DOWNLOAD_T, &download_size);
if (r)
- goto ERROR;
+ return r;
if (download_size)
DEBUG(xfer->ctx, " Download Size: %ld bytes\n", download_size);
// Download Speed
r = curl_easy_getinfo(h, CURLINFO_SPEED_DOWNLOAD_T, &download_speed);
if (r)
- goto ERROR;
+ return r;
if (download_speed)
DEBUG(xfer->ctx, " Download Speed: %ld bps\n", download_speed);
// Upload Size
r = curl_easy_getinfo(h, CURLINFO_SIZE_UPLOAD_T, &upload_size);
if (r)
- goto ERROR;
+ return r;
if (upload_size)
DEBUG(xfer->ctx, " Upload Size: %ld bytes\n", upload_size);
// Upload Speed
r = curl_easy_getinfo(h, CURLINFO_SPEED_UPLOAD_T, &upload_speed);
if (r)
- goto ERROR;
+ return r;
if (upload_speed)
DEBUG(xfer->ctx, " Upload Speed: %ld bps\n", upload_speed);
// Verify the received payload
r = pakfire_xfer_verify(xfer);
if (r < 0)
- goto ERROR;
+ return r;
// Save the payload
r = pakfire_xfer_save(xfer);
if (r < 0)
- goto ERROR;
+ return r;
break;
#ifdef CURL_HAS_WEBSOCKETS
case PAKFIRE_XFER_SOCKET:
r = pakfire_xfer_done_socket(xfer, code);
if (r < 0)
- goto ERROR;
+ return r;
#endif /* CURL_HAS_WEBSOCKETS */
default:
// Report that something went wrong
r = pakfire_xfer_fail(xfer, code);
if (r < 0)
- goto ERROR;
+ return r;
}
-ERROR:
- // Remove the xfer from the queue
- if (xfer->client)
- pakfire_httpclient_dequeue_xfer(xfer->client, xfer);
-
- return r;
+ return 0;
}
static int pakfire_xfer_update(void* data,