From: Michael Tremer Date: Thu, 26 Jun 2025 09:42:44 +0000 (+0000) Subject: builder: Move the job logic from the daemon X-Git-Url: http://git.ipfire.org/gitweb/gitweb.cgi?a=commitdiff_plain;h=a8caa32c5379b3353433bc389840feaf2feb5bf5;p=pakfire.git builder: Move the job logic from the daemon Signed-off-by: Michael Tremer --- diff --git a/src/pakfire/builder.c b/src/pakfire/builder.c index 3c5eae3c..984d7192 100644 --- a/src/pakfire/builder.c +++ b/src/pakfire/builder.c @@ -26,13 +26,17 @@ #include #include #include +#include #include +#include #include #include // Submit stats every 30 seconds #define PAKFIRE_STATS_TIMER S_TO_US(30) +#define MAX_JOBS 64 + struct pakfire_builder { // Context struct pakfire_ctx* ctx; @@ -55,6 +59,10 @@ struct pakfire_builder { // Reconnect Timer & Holdoff Time sd_event_source* reconnect_timer; uint64_t reconnect_holdoff; + + // Jobs + struct pakfire_job* jobs[MAX_JOBS]; + unsigned int running_jobs; }; static void pakfire_builder_free(struct pakfire_builder* self) { @@ -397,6 +405,9 @@ int pakfire_builder_close(struct pakfire_xfer* xfer, int code, void* data) { struct pakfire_builder* self = data; int r; + // Log action + DEBUG(self->ctx, "Builder disconnected\n"); + // Remove the connection from the client r = pakfire_client_builder_disconnected(self->client, xfer); if (r < 0) @@ -436,10 +447,159 @@ int pakfire_builder_close(struct pakfire_xfer* xfer, int code, void* data) { return 0; } -static int pakfire_builder_handle_job_message(struct pakfire_builder* self, const char* job_id, struct json_object* message) { - return -EINVAL; // XXX TODO +// Jobs + +static struct pakfire_job* pakfire_builder_find_job( + struct pakfire_builder* self, const char* job_id) { + // Walk through all jobs + for (unsigned int i = 0; i < MAX_JOBS; i++) { + if (!self->jobs[i]) + continue; + + // Return the matching job + if (pakfire_job_has_id(self->jobs[i], job_id)) + return pakfire_job_ref(self->jobs[i]); + } + + return NULL; +} + +/* + Terminates all running jobs +*/ +int pakfire_builder_terminate_jobs(struct pakfire_builder* self) { + int r; + + for (unsigned int i = 0; i < MAX_JOBS; i++) { + // Skip any empty slots + if (!self->jobs[i]) + continue; + + // Terminate the job + r = pakfire_job_terminate(self->jobs[i], SIGTERM); + if (r) + return r; + } + + return 0; +} + +/* + Called after a job has exited +*/ +int pakfire_builder_job_finished(struct pakfire_builder* self, struct pakfire_job* job) { + int r; + + DEBUG(self->ctx, "Removing job %p\n", job); + + for (unsigned int i = 0; i < MAX_JOBS; i++) { + if (self->jobs[i] != job) + continue; + + // Dereference the job and clear the slot + pakfire_job_unref(self->jobs[i]); + self->jobs[i] = NULL; + + break; + } + + // Now we have one less job running + self->running_jobs--; + +#if 0 + // Release the shutdown inhibition if there are no more jobs running + if (!self->running_jobs) { + r = pakfire_daemon_release_inhibit_shutdown(daemon); + if (r < 0) + return r; + } +#endif + + return 0; +} + +/* + Called when we have received a message for a specific job +*/ +static int pakfire_builder_handle_job_message( + struct pakfire_builder* self, const char* job_id, struct json_object* message) { + struct pakfire_job* job = NULL; + int r; + + // Find the job + job = pakfire_builder_find_job(self, job_id); + if (!job) { + WARN(self->ctx, "Received message for job %s which does not exist. Ignoring.\n", job_id); + return 0; + } + + // Dispatch the message to the job + r = pakfire_job_handle_message(job, message); + if (r < 0) + goto ERROR; + +ERROR: + if (job) + pakfire_job_unref(job); + + return r; +} + +/* + Called when a new job has been received +*/ +static int pakfire_builder_job(struct pakfire_builder* self, json_object* m) { + struct json_object* data = NULL; + struct pakfire_job* job = NULL; + int r; + +#if 0 + // Inhibit shutdown + r = pakfire_daemon_inhibit_shutdown(daemon); + if (r < 0) + goto ERROR; +#endif + + // Fetch the data from the message + if (!json_object_object_get_ex(m, "data", &data)) { + ERROR(self->ctx, "Job does not have any data\n"); + r = -EINVAL; + goto ERROR; + } + + // Create a new job + r = pakfire_job_create(&job, self->ctx, self, data); + if (r) { + ERROR(self->ctx, "Could not create a new job: %s\n", strerror(-r)); + goto ERROR; + } + + // Launch the job + r = pakfire_job_launch(job); + if (r < 0) { + ERROR(self->ctx, "Failed to launch the job: %s\n", strerror(-r)); + goto ERROR; + } + + // Store the job + for (unsigned int i = 0; i < MAX_JOBS; i++) { + if (!self->jobs[i]) { + self->jobs[i] = pakfire_job_ref(job); + break; + } + } + + // Increment the number of running jobs + self->running_jobs++; + +ERROR: + if (job) + pakfire_job_unref(job); + + return r; } + static int pakfire_builder_handle_message(struct pakfire_builder* self, struct json_object* message) { const char* job_id = NULL; const char* type = NULL; @@ -471,10 +631,8 @@ static int pakfire_builder_handle_message(struct pakfire_builder* self, struct j } // Handle new jobs -#if 0 if (pakfire_string_equals(type, "job")) return pakfire_builder_job(self, message); -#endif // Log an error for any unknown messages ERROR(self->ctx, "Unknown message. Ignoring:\n%s\n", @@ -509,6 +667,13 @@ ERROR: return r; } +int pakfire_builder_send(struct pakfire_xfer* xfer, void* data) { + struct pakfire_builder* self = data; + + // Stream logs + return pakfire_builder_stream_logs(self); +} + int pakfire_builder_send_message(struct pakfire_builder* self, struct json_object* message) { const char* m = NULL; size_t length = 0; @@ -540,3 +705,51 @@ int pakfire_builder_send_message(struct pakfire_builder* self, struct json_objec return 0; } + +/* + Called when we are ready to stream the log. + + This function calls each running job which will only send one line + at a time to ensure that one job will not take the entire bandwidth. +*/ +int pakfire_builder_stream_logs(struct pakfire_builder* self) { + unsigned int lines; + int r; + + // Bail if we don't have a control connection + if (!self->control) + return 0; + + // Bail if the connection isn't ready to send + else if (!pakfire_xfer_is_ready_to_send(self->control)) + return 0; + + do { + // Reset lines + lines = 0; + + // Have every job send one line + for (unsigned int i = 0; i < MAX_JOBS; i++) { + // Skip any empty slots + if (!self->jobs[i]) + continue; + + // Stream logs + r = pakfire_job_stream_logs(self->jobs[i]); + if (r < 0) { + switch (-r) { + case EAGAIN: + return 0; + + default: + return r; + } + } + + // Add up lines + lines += r; + } + } while (lines); + + return 0; +} diff --git a/src/pakfire/builder.h b/src/pakfire/builder.h index 508f7113..d43a323b 100644 --- a/src/pakfire/builder.h +++ b/src/pakfire/builder.h @@ -28,6 +28,8 @@ struct pakfire_builder; #include #include +#include +#include int pakfire_builder_create(struct pakfire_builder** builder, struct pakfire_ctx* ctx, struct pakfire_client* client); @@ -42,7 +44,16 @@ int pakfire_builder_connect(struct pakfire_builder* self); int pakfire_builder_connected(struct pakfire_xfer* xfer, void* data); int pakfire_builder_close(struct pakfire_xfer* xfer, int code, void* data); int pakfire_builder_recv(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data); +int pakfire_builder_send(struct pakfire_xfer* xfer, void* data); int pakfire_builder_send_message(struct pakfire_builder* self, struct json_object* message); +// Jobs + +int pakfire_builder_terminate_jobs(struct pakfire_builder* self); +int pakfire_builder_job_finished(struct pakfire_builder* self, struct pakfire_job* job); + +// Stream Logs +int pakfire_builder_stream_logs(struct pakfire_builder* self); + #endif /* PAKFIRE_BUILDER_H */ diff --git a/src/pakfire/client.c b/src/pakfire/client.c index 84de034a..7a1bc52a 100644 --- a/src/pakfire/client.c +++ b/src/pakfire/client.c @@ -815,7 +815,7 @@ int pakfire_client_builder_connect(struct pakfire_client* self, struct pakfire_b r = pakfire_xfer_socket(xfer, pakfire_builder_connected, pakfire_builder_recv, - NULL, + pakfire_builder_send, pakfire_builder_close, builder); if (r < 0) diff --git a/src/pakfire/daemon.c b/src/pakfire/daemon.c index ae99ef86..9dce8989 100644 --- a/src/pakfire/daemon.c +++ b/src/pakfire/daemon.c @@ -33,14 +33,11 @@ #include #include #include -#include #include #include #include #include -#define MAX_JOBS 64 - struct pakfire_daemon { struct pakfire_ctx* ctx; int nrefs; @@ -63,47 +60,8 @@ struct pakfire_daemon { // cgroup struct pakfire_cgroup* cgroup; - - // Jobs - struct pakfire_job* jobs[MAX_JOBS]; - unsigned int running_jobs; }; -static struct pakfire_job* pakfire_daemon_find_job( - struct pakfire_daemon* self, const char* job_id) { - // Walk through all jobs - for (unsigned int i = 0; i < MAX_JOBS; i++) { - if (!self->jobs[i]) - continue; - - // Return the matching job - if (pakfire_job_has_id(self->jobs[i], job_id)) - return pakfire_job_ref(self->jobs[i]); - } - - return NULL; -} - -/* - Terminates all running jobs -*/ -static int pakfire_daemon_terminate_jobs(struct pakfire_daemon* daemon) { - int r; - - for (unsigned int i = 0; i < MAX_JOBS; i++) { - // Skip any empty slots - if (!daemon->jobs[i]) - continue; - - // Terminate the job - r = pakfire_job_terminate(daemon->jobs[i], SIGTERM); - if (r) - return r; - } - - return 0; -} - static int pakfire_daemon_terminate(sd_event_source* source, const struct signalfd_siginfo* si, void* data) { struct pakfire_daemon* daemon = data; @@ -111,7 +69,7 @@ static int pakfire_daemon_terminate(sd_event_source* source, DEBUG(daemon->ctx, "Received signal to terminate...\n"); // Terminate all jobs - pakfire_daemon_terminate_jobs(daemon); + pakfire_builder_terminate_jobs(daemon->builder); return sd_event_exit(sd_event_source_get_event(source), 0); } @@ -187,147 +145,6 @@ static int pakfire_daemon_release_inhibit_shutdown(struct pakfire_daemon* daemon return 0; } -/* - Called when we have received a message for a specific job -*/ -static int pakfire_daemon_handle_job_message( - struct pakfire_daemon* self, const char* job_id, struct json_object* message) { - struct pakfire_job* job = NULL; - int r; - - // Find the job - job = pakfire_daemon_find_job(self, job_id); - if (!job) { - WARN(self->ctx, "Received message for job %s which does not exist. Ignoring.\n", job_id); - return 0; - } - - // Dispatch the message to the job - r = pakfire_job_handle_message(job, message); - if (r < 0) - goto ERROR; - -ERROR: - if (job) - pakfire_job_unref(job); - - return r; -} - -/* - Called when a new job has been received -*/ -static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) { - struct pakfire_job* job = NULL; - struct json_object* data = NULL; - int r; - - // Inhibit shutdown - r = pakfire_daemon_inhibit_shutdown(daemon); - if (r < 0) - goto ERROR; - - // Fetch the data from the message - if (!json_object_object_get_ex(m, "data", &data)) { - ERROR(daemon->ctx, "Job does not have any data\n"); - r = -EINVAL; - goto ERROR; - } - - // Create a new job - r = pakfire_job_create(&job, daemon->ctx, daemon, data); - if (r) { - ERROR(daemon->ctx, "Could not create a new job: %s\n", strerror(-r)); - goto ERROR; - } - - // Launch the job - r = pakfire_job_launch(job); - if (r < 0) { - ERROR(daemon->ctx, "Failed to launch the job: %s\n", strerror(-r)); - goto ERROR; - } - - // Store the job - for (unsigned int i = 0; i < MAX_JOBS; i++) { - if (!daemon->jobs[i]) { - daemon->jobs[i] = pakfire_job_ref(job); - break; - } - } - - // Increment the number of running jobs - daemon->running_jobs++; - -ERROR: - if (job) - pakfire_job_unref(job); - - return r; -} - -/* - Called when we are ready to stream the log. - - This function calls each running job which will only send one line - at a time to ensure that one job will not take the entire bandwidth. -*/ -int pakfire_daemon_stream_logs(struct pakfire_daemon* self) { - unsigned int lines; - int r; - -#if 0 - // Bail if we don't have a control connection - if (!self->control) - return 0; - - // Bail if the connection isn't ready to send - else if (!pakfire_xfer_is_ready_to_send(self->control)) - return 0; -#endif - - do { - // Reset lines - lines = 0; - - // Have every job send one line - for (unsigned int i = 0; i < MAX_JOBS; i++) { - // Skip any empty slots - if (!self->jobs[i]) - continue; - - // Stream logs - r = pakfire_job_stream_logs(self->jobs[i]); - if (r < 0) { - switch (-r) { - case EAGAIN: - return 0; - - default: - return r; - } - } - - // Add up lines - lines += r; - } - } while (lines); - - return 0; -} - -static int pakfire_daemon_send(struct pakfire_xfer* xfer, void* data) { - struct pakfire_daemon* self = data; - int r; - - // Stream logs - r = pakfire_daemon_stream_logs(self); - if (r < 0) - return r; - - return 0; -} - /* Called when the client is ready and we can start making connections... */ @@ -610,35 +427,3 @@ ERROR: return 1; } - -/* - Called after a job has exited -*/ -int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job) { - int r; - - DEBUG(daemon->ctx, "Removing job %p\n", job); - - for (unsigned int i = 0; i < MAX_JOBS; i++) { - if (daemon->jobs[i] != job) - continue; - - // Dereference the job and clear the slot - pakfire_job_unref(daemon->jobs[i]); - daemon->jobs[i] = NULL; - - break; - } - - // Now we have one less job running - daemon->running_jobs--; - - // Release the shutdown inhibition if there are no more jobs running - if (!daemon->running_jobs) { - r = pakfire_daemon_release_inhibit_shutdown(daemon); - if (r < 0) - return r; - } - - return 0; -} diff --git a/src/pakfire/daemon.h b/src/pakfire/daemon.h index 0b2a4d59..265c4e19 100644 --- a/src/pakfire/daemon.h +++ b/src/pakfire/daemon.h @@ -40,11 +40,6 @@ const char* pakfire_daemon_url(struct pakfire_daemon* daemon); int pakfire_daemon_main(struct pakfire_daemon* daemon); -int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job); - -// Stream Logs -int pakfire_daemon_stream_logs(struct pakfire_daemon* self); - // Send message int pakfire_daemon_send_message(struct pakfire_daemon* self, struct json_object* message); diff --git a/src/pakfire/job.c b/src/pakfire/job.c index dbcb7c51..c539c975 100644 --- a/src/pakfire/job.c +++ b/src/pakfire/job.c @@ -32,11 +32,11 @@ #include #include +#include #include #include #include #include -#include #include #include #include @@ -52,8 +52,8 @@ struct pakfire_job { struct pakfire_ctx* ctx; int nrefs; - // Daemon - struct pakfire_daemon* daemon; + // Builder + struct pakfire_builder* builder; // Client struct pakfire_client* client; @@ -254,8 +254,8 @@ static void pakfire_job_free(struct pakfire_job* job) { pakfire_config_unref(job->config); if (job->loop) sd_event_unref(job->loop); - if (job->daemon) - pakfire_daemon_unref(job->daemon); + if (job->builder) + pakfire_builder_unref(job->builder); if (job->ctx) pakfire_ctx_unref(job->ctx); free(job); @@ -277,10 +277,12 @@ static int pakfire_job_xfer_create(struct pakfire_xfer** xfer, if (r < 0) goto ERROR; +#if 0 // Set the base URL r = pakfire_xfer_set_baseurl(x, pakfire_daemon_url(job->daemon)); if (r < 0) goto ERROR; +#endif // Success *xfer = pakfire_xfer_ref(x); @@ -514,8 +516,8 @@ static int pakfire_job_exited(sd_event_source* s, const siginfo_t* si, void* dat break; } - // Let the daemon know this is finished - return pakfire_daemon_job_finished(job->daemon, job); + // Let the builder know this is finished + return pakfire_builder_job_finished(job->builder, job); } static int pakfire_job_send_log(struct pakfire_job* job, int priority, const char* line, size_t length) { @@ -526,8 +528,8 @@ static int pakfire_job_send_log(struct pakfire_job* job, int priority, const cha if (r < 0) return r; - // Ask the daemon to send it - return pakfire_daemon_stream_logs(job->daemon); + // Ask the builder to send it + return pakfire_builder_stream_logs(job->builder); } static int pakfire_job_stdout(struct pakfire_ctx* ctx, @@ -828,7 +830,7 @@ static int pakfire_job_send_log_line(struct pakfire_job* job, goto ERROR; // Send the message - r = pakfire_daemon_send_message(job->daemon, message); + r = pakfire_builder_send_message(job->builder, message); if (r < 0) { ERROR(job->ctx, "Could not send log message: %s\n", strerror(-r)); goto ERROR; @@ -878,7 +880,7 @@ int pakfire_job_stream_logs(struct pakfire_job* self) { } int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx, - struct pakfire_daemon* daemon, json_object* data) { + struct pakfire_builder* builder, json_object* data) { struct pakfire_job* j = NULL; int r; @@ -893,8 +895,8 @@ int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx, // Initialize the reference counter j->nrefs = 1; - // Store a reference to the daemon - j->daemon = pakfire_daemon_ref(daemon); + // Store a reference to the builder + j->builder = pakfire_builder_ref(builder); // Fetch a reference to the event loop r = pakfire_ctx_loop(j->ctx, &j->loop); @@ -903,8 +905,10 @@ int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx, goto ERROR; } +#if 0 // Fetch a reference to the client j->client = pakfire_daemon_client(daemon); +#endif // Initialize the PID file descriptor j->pidfd = -EBADF; diff --git a/src/pakfire/job.h b/src/pakfire/job.h index 967ab173..a930fc7e 100644 --- a/src/pakfire/job.h +++ b/src/pakfire/job.h @@ -21,13 +21,13 @@ #ifndef PAKFIRE_JOB_H #define PAKFIRE_JOB_H +#include #include -#include struct pakfire_job; int pakfire_job_create(struct pakfire_job** worker, - struct pakfire_ctx* ctx, struct pakfire_daemon* daemon, json_object* data); + struct pakfire_ctx* ctx, struct pakfire_builder* builder, json_object* data); struct pakfire_job* pakfire_job_ref(struct pakfire_job* worker); struct pakfire_job* pakfire_job_unref(struct pakfire_job* worker);