From: Michael Tremer Date: Wed, 25 Jun 2025 09:02:36 +0000 (+0000) Subject: client: Implement asynchronous uploads X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=c3813226c9b0402e281bc58acdb8ac675124e492;p=pakfire.git client: Implement asynchronous uploads Signed-off-by: Michael Tremer --- diff --git a/src/cli/lib/client-build.c b/src/cli/lib/client-build.c index c2a2a26a..c21c73c0 100644 --- a/src/cli/lib/client-build.c +++ b/src/cli/lib/client-build.c @@ -114,6 +114,7 @@ int cli_client_build(void* data, int argc, char* argv[]) { if (r < 0) goto ERROR; +#if 0 // Upload all packages for (unsigned int i = 0; i < local_args.num_packages; i++) { r = pakfire_client_upload(client, local_args.packages[i], NULL, &upload); @@ -123,6 +124,7 @@ int cli_client_build(void* data, int argc, char* argv[]) { // Store the upload ID local_args.uploads[local_args.num_uploads++] = upload; } +#endif // No uploads if (!local_args.num_uploads) diff --git a/src/cli/lib/upload_create.c b/src/cli/lib/upload_create.c index 53f695c1..64198a6b 100644 --- a/src/cli/lib/upload_create.c +++ b/src/cli/lib/upload_create.c @@ -55,11 +55,39 @@ static error_t parse(int key, char* arg, struct argp_state* state, void* data) { return 0; } +static int upload_callback(struct pakfire_client* client, + pakfire_client_upload_status status, const char* uuid, void* data) { + switch (status) { + case PAKFIRE_CLIENT_UPLOAD_SUCCESSFUL: + printf("Successfully created upload %s\n", uuid); + return 0; + + default: + break; + } + + return 0; +} + +static int auth_callback(struct pakfire_client* client, + pakfire_client_auth_status status, void* data) { + const struct cli_local_args* local_args = data; + int r; + + // Create all uploads + for (unsigned int i = 0; i < local_args->num_files; i++) { + r = pakfire_client_upload(client, local_args->files[i], NULL, upload_callback, NULL); + if (r < 0) + return r; + } + + return 0; +} + int cli_upload_create(void* data, int argc, char* argv[]) { struct cli_global_args* global_args = data; struct cli_local_args local_args = {}; struct pakfire_client* client = NULL; - char* uuid = NULL; int r; // Parse the command line @@ -72,17 +100,13 @@ int cli_upload_create(void* data, int argc, char* argv[]) { if (r < 0) goto ERROR; - // List uploads - for (unsigned int i = 0; i < local_args.num_files; i++) { - r = pakfire_client_upload(client, local_args.files[i], NULL, &uuid); - if (r) - goto ERROR; + // Set authentication callback + r = pakfire_client_set_auth_callback(client, auth_callback, &local_args); + if (r < 0) + goto ERROR; - if (uuid) { - printf("Uploaded %s as %s\n", local_args.files[i], uuid); - free(uuid); - } - } + // Run the client + r = pakfire_client_run(client); ERROR: if (client) diff --git a/src/pakfire/client.c b/src/pakfire/client.c index deeb948d..5fe8553b 100644 --- a/src/pakfire/client.c +++ b/src/pakfire/client.c @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -40,6 +41,32 @@ #include #include +struct pakfire_client_upload { + STAILQ_ENTRY(pakfire_client_upload) nodes; + + // Client + struct pakfire_client* client; + + // UUID + char uuid[UUID_STR_LEN]; + + // Filename + char filename[NAME_MAX]; + + // Stat Result + struct stat stat; + + // File Handle + FILE* f; + + // Hashes + struct pakfire_hashes hashes; + + // Callback + pakfire_client_upload_callback callback; + void* data; +}; + struct pakfire_client { struct pakfire_ctx* ctx; int nrefs; @@ -66,6 +93,9 @@ struct pakfire_client { pakfire_client_auth_callback callback; void* data; } auth; + + // Uploads + STAILQ_HEAD(uploads, pakfire_client_upload) uploads; }; static int pakfire_client_xfer_create(struct pakfire_xfer** xfer, @@ -228,6 +258,9 @@ int pakfire_client_create(struct pakfire_client** client, if (r < 0) goto ERROR; + // Initialize uploads + STAILQ_INIT(&self->uploads); + // Setup the authentication timer r = sd_event_add_time_relative(self->loop, &self->auth.timer, CLOCK_MONOTONIC, 0, 0, pakfire_client_auth_timer, self); @@ -542,125 +575,124 @@ ERROR: // Uploads -static int pakfire_client_create_upload(struct pakfire_client* self, - const char* path, const char* filename, FILE* f, char** uuid) { - struct pakfire_hashes hashes = {}; - struct pakfire_xfer* xfer = NULL; - struct json_object* response = NULL; - struct json_object* request = NULL; - char* hexdigest_blake2b512 = NULL; - struct stat stat; - int r; +static void pakfire_client_upload_free(struct pakfire_client_upload* upload) { + // Close the file handle + if (upload->f) + fclose(upload->f); - const int fd = fileno(f); + free(upload); +} - // Stat the file - r = fstat(fd, &stat); - if (r) { - ERROR(self->ctx, "Could not stat %s: %s\n", path, strerror(errno)); +static int pakfire_client_upload_create(struct pakfire_client_upload** upload, + struct pakfire_client* client, const char* path, const char* filename, + pakfire_client_upload_callback callback, void* data) { + struct pakfire_client_upload* self = NULL; + char basename[NAME_MAX]; + int r; + + // Allocate a new object + self = calloc(1, sizeof(*self)); + if (!self) { r = -errno; goto ERROR; } - // Compute the digest - r = pakfire_hash_file(self->ctx, f, PAKFIRE_HASH_BLAKE2B512, &hashes); - if (r < 0) { - ERROR(self->ctx, "Could not compute the checksum of %s: %s\n", - path, strerror(-r)); - goto ERROR; - } + // Reference the client + self->client = client; - // Convert the digest into hex format - r = pakfire_hashes_get_hex(&hashes, PAKFIRE_HASH_BLAKE2B512, &hexdigest_blake2b512); + // Compute the basename + r = pakfire_path_basename(basename, path); if (r < 0) goto ERROR; - // Create a new xfer - r = pakfire_client_xfer_create(&xfer, self, "/api/v1/uploads"); - if (r < 0) - goto ERROR; + // Set the basename as default filename + if (!filename) + filename = basename; - // Enable authentication - r = pakfire_client_xfer_auth(self, xfer); + // Store the filename + r = pakfire_string_set(self->filename, filename); if (r < 0) goto ERROR; - // Make the request - r = pakfire_json_new_object(&request); - if (r < 0) + // Open the source file + self->f = fopen(path, "r"); + if (!self->f) { + ERROR(client->ctx, "Could not open file for upload %s: %m\n", path); + r = -errno; goto ERROR; + } - // Add the filename parameter - r = pakfire_json_add_string(request, "filename", filename); - if (r < 0) - goto ERROR; + // Fetch the file descriptor + const int fd = fileno(self->f); - // Add the size parameter - r = pakfire_json_add_int64(request, "size", stat.st_size); - if (r < 0) + // Stat the file + r = fstat(fd, &self->stat); + if (r) { + ERROR(client->ctx, "Could not stat %s: %m\n", path); + r = -errno; goto ERROR; + } - // Add the hexdigest parameter - r = pakfire_json_add_string(request, "hexdigest_blake2b512", hexdigest_blake2b512); - if (r < 0) + // Compute the digest + r = pakfire_hash_file(client->ctx, self->f, PAKFIRE_HASH_BLAKE2B512, &self->hashes); + if (r < 0) { + ERROR(client->ctx, "Could not compute the checksum of %s: %s\n", path, strerror(-r)); goto ERROR; + } - // Send the request - r = pakfire_xfer_run_api_request(xfer, request, &response); - if (r < 0) - goto ERROR; + // Store the callback + self->callback = callback; + self->data = data; - // Fetch the ID - if (uuid) { - r = pakfire_client_api_response_string(self, response, "uuid", uuid); - if (r < 0) - goto ERROR; - } + // Append to the list + STAILQ_INSERT_TAIL(&client->uploads, self, nodes); - // Success - r = 0; + // Return a pointer to the upload + *upload = self; + + return 0; ERROR: - if (xfer) - pakfire_xfer_unref(xfer); - if (response) - json_object_put(response); - if (request) - json_object_put(request); - if (hexdigest_blake2b512) - free(hexdigest_blake2b512); + if (self) + pakfire_client_upload_free(self); return r; } -static int pakfire_client_upload_payload(struct pakfire_client* self, - const char* filename, const char* uuid, FILE* f) { +static void pakfire_client_upload_remove(struct pakfire_client* self, struct pakfire_client_upload* upload) { + STAILQ_REMOVE(&self->uploads, upload, pakfire_client_upload, nodes); +} + +static int pakfire_client_upload_payload(struct pakfire_client_upload* upload) { + struct pakfire_client* self = upload->client; struct pakfire_xfer* xfer = NULL; int r; // Create a new xfer - r = pakfire_client_xfer_create(&xfer, self, "/api/v1/uploads/%s", uuid); + r = pakfire_client_xfer_create(&xfer, self, "/api/v1/uploads/%s", upload->uuid); if (r < 0) goto ERROR; // Set the title - r = pakfire_xfer_set_title(xfer, filename); + r = pakfire_xfer_set_title(xfer, upload->filename); if (r < 0) goto ERROR; // Enable authentication r = pakfire_client_xfer_auth(self, xfer); - if (r) + if (r < 0) goto ERROR; // Set source file - r = pakfire_xfer_set_input(xfer, f); - if (r) + r = pakfire_xfer_set_input(xfer, upload->f); + if (r < 0) goto ERROR; + // XXX Set the callback + // Send the request - r = pakfire_xfer_run_api_request(xfer, NULL, NULL); - if (r) + r = pakfire_httpclient_enqueue(self->httpclient, xfer); + if (r < 0) goto ERROR; ERROR: @@ -670,53 +702,107 @@ ERROR: return r; } -int pakfire_client_upload(struct pakfire_client* self, - const char* path, const char* filename, char** uuid) { - char basename[NAME_MAX]; - FILE* f = NULL; +static int pakfire_client_upload_response(struct pakfire_xfer* xfer, + pakfire_xfer_error_code_t code, struct json_object* response, void* data) { + struct pakfire_client_upload* self = data; + const char* uuid = NULL; int r; - // Compute the basename - r = pakfire_path_basename(basename, path); - if (r) - goto ERROR; + switch (code) { + case PAKFIRE_XFER_OK: + // Fetch the UUID from the response + r = pakfire_json_get_string(response, "uuid", &uuid); + if (r < 0) + return r; - // Set the basename as default filename - if (!filename) - filename = basename; + // Store the UUID + r = pakfire_string_set(self->uuid, uuid); + if (r < 0) + return r; - // Open the source file - f = fopen(path, "r"); - if (!f) { - ERROR(self->ctx, "Could not open file for upload %s: %m\n", path); - return -errno; + // Upload the payload + return pakfire_client_upload_payload(self); + + // XXX Handle any errors + default: + break; } + return -EINVAL; +} + +int pakfire_client_upload(struct pakfire_client* self, + const char* path, const char* filename, pakfire_client_upload_callback callback, void* data) { + struct pakfire_client_upload* upload = NULL; + struct json_object* request = NULL; + char* hexdigest_blake2b512 = NULL; + struct pakfire_xfer* xfer = NULL; + int r; + // Create a new upload - r = pakfire_client_create_upload(self, path, filename, f, uuid); + r = pakfire_client_upload_create(&upload, self, path, filename, callback, data); if (r < 0) { - ERROR(self->ctx, "Failed to create upload: %s\n", strerror(-r)); + ERROR(self->ctx, "Failed to create a new upload: %s\n", strerror(-r)); goto ERROR; } - DEBUG(self->ctx, "Created a new upload (%s)\n", *uuid); + // Convert the digest into hex format + r = pakfire_hashes_get_hex(&upload->hashes, PAKFIRE_HASH_BLAKE2B512, &hexdigest_blake2b512); + if (r < 0) + goto ERROR; - // Send the payload - r = pakfire_client_upload_payload(self, filename, *uuid, f); - if (r < 0) { - ERROR(self->ctx, "Failed to upload the payload for %s: %s\n", *uuid, strerror(-r)); + // Create a new xfer + r = pakfire_client_xfer_create(&xfer, self, "/api/v1/uploads"); + if (r < 0) goto ERROR; - } -ERROR: - if (r) { - if (*uuid) - free(*uuid); + // Enable authentication + r = pakfire_client_xfer_auth(self, xfer); + if (r < 0) + goto ERROR; - *uuid = NULL; - } - if (f) - fclose(f); + // Make the request + r = pakfire_json_new_object(&request); + if (r < 0) + goto ERROR; + + // Add the filename parameter + r = pakfire_json_add_string(request, "filename", upload->filename); + if (r < 0) + goto ERROR; + + // Add the size parameter + r = pakfire_json_add_int64(request, "size", upload->stat.st_size); + if (r < 0) + goto ERROR; + + // Add the hexdigest parameter + r = pakfire_json_add_string(request, "hexdigest_blake2b512", hexdigest_blake2b512); + if (r < 0) + goto ERROR; + + // Set the request body + r = pakfire_xfer_set_json_payload(xfer, request); + if (r < 0) + goto ERROR; + + // Register the callback + r = pakfire_xfer_set_response_callback(xfer, pakfire_client_upload_response, upload); + if (r < 0) + goto ERROR; + + // Enqueue the transfer + r = pakfire_httpclient_enqueue(self->httpclient, xfer); + if (r < 0) + goto ERROR; + +ERROR: + if (hexdigest_blake2b512) + free(hexdigest_blake2b512); + if (request) + json_object_put(request); + if (xfer) + pakfire_xfer_unref(xfer); return r; } diff --git a/src/pakfire/client.h b/src/pakfire/client.h index db94e2e3..c35d9f28 100644 --- a/src/pakfire/client.h +++ b/src/pakfire/client.h @@ -67,8 +67,16 @@ int pakfire_client_build(struct pakfire_client* client, const char* upload, // Uploads +typedef enum { + PAKFIRE_CLIENT_UPLOAD_SUCCESSFUL, + PAKFIRE_CLIENT_UPLOAD_ERROR, +} pakfire_client_upload_status; + +typedef int (*pakfire_client_upload_callback) + (struct pakfire_client* client, pakfire_client_upload_status status, const char* uuid, void* data); + int pakfire_client_upload(struct pakfire_client* client, - const char* path, const char* filename, char** uuid); + const char* path, const char* filename, pakfire_client_upload_callback callback, void* data); int pakfire_client_list_uploads(struct pakfire_client* client, struct json_object** uploads); int pakfire_client_delete_upload(struct pakfire_client* client, const char* uuid); diff --git a/src/pakfire/job.c b/src/pakfire/job.c index d1d9d87b..f1a480d2 100644 --- a/src/pakfire/job.c +++ b/src/pakfire/job.c @@ -321,12 +321,14 @@ static int pakfire_job_finished(struct pakfire_job* job, int status) { goto ERROR; } +#if 0 // Upload the log file r = pakfire_client_upload(job->client, path, filename, &logfile); if (r < 0) { ERROR(job->ctx, "Could not upload the log file: %s\n", strerror(-r)); goto ERROR; } +#endif // Create a new xfer r = pakfire_job_xfer_create(&xfer, job, "/api/v1/jobs/%s/finished", job->id); @@ -458,12 +460,14 @@ static int pakfire_job_result(struct pakfire_ctx* ctx, struct pakfire* pakfire, goto ERROR; } +#if 0 // Upload the file r = pakfire_client_upload(job->client, path, filename, &uuid); if (r < 0) { ERROR(job->ctx, "Could not upload %s: %s\n", nevra, strerror(-r)); goto ERROR; } +#endif // Store the ID of the upload r = pakfire_strings_append(&job->uploads, uuid);