]> git.ipfire.org Git - pakfire.git/commitdiff
builder: Move the job logic from the daemon
authorMichael Tremer <michael.tremer@ipfire.org>
Thu, 26 Jun 2025 09:42:44 +0000 (09:42 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Thu, 26 Jun 2025 09:42:44 +0000 (09:42 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/builder.c
src/pakfire/builder.h
src/pakfire/client.c
src/pakfire/daemon.c
src/pakfire/daemon.h
src/pakfire/job.c
src/pakfire/job.h

index 3c5eae3cdcf34a4166848bfdfbe357f9fe0f9039..984d7192167f9f1bc9e04743cbeb3c2c61b127b5 100644 (file)
 #include <pakfire/client.h>
 #include <pakfire/ctx.h>
 #include <pakfire/builder.h>
+#include <pakfire/job.h>
 #include <pakfire/json.h>
+#include <pakfire/string.h>
 #include <pakfire/util.h>
 #include <pakfire/xfer.h>
 
 // Submit stats every 30 seconds
 #define PAKFIRE_STATS_TIMER S_TO_US(30)
 
+#define MAX_JOBS 64
+
 struct pakfire_builder {
        // Context
        struct pakfire_ctx* ctx;
@@ -55,6 +59,10 @@ struct pakfire_builder {
        // Reconnect Timer & Holdoff Time
        sd_event_source* reconnect_timer;
        uint64_t reconnect_holdoff;
+
+       // Jobs
+       struct pakfire_job* jobs[MAX_JOBS];
+       unsigned int running_jobs;
 };
 
 static void pakfire_builder_free(struct pakfire_builder* self) {
@@ -397,6 +405,9 @@ int pakfire_builder_close(struct pakfire_xfer* xfer, int code, void* data) {
        struct pakfire_builder* self = data;
        int r;
 
+       // Log action
+       DEBUG(self->ctx, "Builder disconnected\n");
+
        // Remove the connection from the client
        r = pakfire_client_builder_disconnected(self->client, xfer);
        if (r < 0)
@@ -436,10 +447,159 @@ int pakfire_builder_close(struct pakfire_xfer* xfer, int code, void* data) {
        return 0;
 }
 
-static int pakfire_builder_handle_job_message(struct pakfire_builder* self, const char* job_id, struct json_object* message) {
-       return -EINVAL; // XXX TODO
+// Jobs
+
+static struct pakfire_job* pakfire_builder_find_job(
+               struct pakfire_builder* self, const char* job_id) {
+       // Walk through all jobs
+       for (unsigned int i = 0; i < MAX_JOBS; i++) {
+               if (!self->jobs[i])
+                       continue;
+
+               // Return the matching job
+               if (pakfire_job_has_id(self->jobs[i], job_id))
+                       return pakfire_job_ref(self->jobs[i]);
+       }
+
+       return NULL;
+}
+
+/*
+       Terminates all running jobs
+*/
+int pakfire_builder_terminate_jobs(struct pakfire_builder* self) {
+       int r;
+
+       for (unsigned int i = 0; i < MAX_JOBS; i++) {
+               // Skip any empty slots
+               if (!self->jobs[i])
+                       continue;
+
+               // Terminate the job
+               r = pakfire_job_terminate(self->jobs[i], SIGTERM);
+               if (r)
+                       return r;
+       }
+
+       return 0;
+}
+
+/*
+       Called after a job has exited
+*/
+int pakfire_builder_job_finished(struct pakfire_builder* self, struct pakfire_job* job) {
+       int r;
+
+       DEBUG(self->ctx, "Removing job %p\n", job);
+
+       for (unsigned int i = 0; i < MAX_JOBS; i++) {
+               if (self->jobs[i] != job)
+                       continue;
+
+               // Dereference the job and clear the slot
+               pakfire_job_unref(self->jobs[i]);
+               self->jobs[i] = NULL;
+
+               break;
+       }
+
+       // Now we have one less job running
+       self->running_jobs--;
+
+#if 0
+       // Release the shutdown inhibition if there are no more jobs running
+       if (!self->running_jobs) {
+               r = pakfire_daemon_release_inhibit_shutdown(daemon);
+               if (r < 0)
+                       return r;
+       }
+#endif
+
+       return 0;
+}
+
+/*
+       Called when we have received a message for a specific job
+*/
+static int pakfire_builder_handle_job_message(
+               struct pakfire_builder* self, const char* job_id, struct json_object* message) {
+       struct pakfire_job* job = NULL;
+       int r;
+
+       // Find the job
+       job = pakfire_builder_find_job(self, job_id);
+       if (!job) {
+               WARN(self->ctx, "Received message for job %s which does not exist. Ignoring.\n", job_id);
+               return 0;
+       }
+
+       // Dispatch the message to the job
+       r = pakfire_job_handle_message(job, message);
+       if (r < 0)
+               goto ERROR;
+
+ERROR:
+       if (job)
+               pakfire_job_unref(job);
+
+       return r;
+}
+
+/*
+       Called when a new job has been received
+*/
+static int pakfire_builder_job(struct pakfire_builder* self, json_object* m) {
+       struct json_object* data = NULL;
+       struct pakfire_job* job = NULL;
+       int r;
+
+#if 0
+       // Inhibit shutdown
+       r = pakfire_daemon_inhibit_shutdown(daemon);
+       if (r < 0)
+               goto ERROR;
+#endif
+
+       // Fetch the data from the message
+       if (!json_object_object_get_ex(m, "data", &data)) {
+               ERROR(self->ctx, "Job does not have any data\n");
+               r = -EINVAL;
+               goto ERROR;
+       }
+
+       // Create a new job
+       r = pakfire_job_create(&job, self->ctx, self, data);
+       if (r) {
+               ERROR(self->ctx, "Could not create a new job: %s\n", strerror(-r));
+               goto ERROR;
+       }
+
+       // Launch the job
+       r = pakfire_job_launch(job);
+       if (r < 0) {
+               ERROR(self->ctx, "Failed to launch the job: %s\n", strerror(-r));
+               goto ERROR;
+       }
+
+       // Store the job
+       for (unsigned int i = 0; i < MAX_JOBS; i++) {
+               if (!self->jobs[i]) {
+                       self->jobs[i] = pakfire_job_ref(job);
+                       break;
+               }
+       }
+
+       // Increment the number of running jobs
+       self->running_jobs++;
+
+ERROR:
+       if (job)
+               pakfire_job_unref(job);
+
+       return r;
 }
 
+
 static int pakfire_builder_handle_message(struct pakfire_builder* self, struct json_object* message) {
        const char* job_id = NULL;
        const char* type = NULL;
@@ -471,10 +631,8 @@ static int pakfire_builder_handle_message(struct pakfire_builder* self, struct j
        }
 
        // Handle new jobs
-#if 0
        if (pakfire_string_equals(type, "job"))
                return pakfire_builder_job(self, message);
-#endif
 
        // Log an error for any unknown messages
        ERROR(self->ctx, "Unknown message. Ignoring:\n%s\n",
@@ -509,6 +667,13 @@ ERROR:
        return r;
 }
 
+int pakfire_builder_send(struct pakfire_xfer* xfer, void* data) {
+       struct pakfire_builder* self = data;
+
+       // Stream logs
+       return pakfire_builder_stream_logs(self);
+}
+
 int pakfire_builder_send_message(struct pakfire_builder* self, struct json_object* message) {
        const char* m = NULL;
        size_t length = 0;
@@ -540,3 +705,51 @@ int pakfire_builder_send_message(struct pakfire_builder* self, struct json_objec
 
        return 0;
 }
+
+/*
+       Called when we are ready to stream the log.
+
+       This function calls each running job which will only send one line
+       at a time to ensure that one job will not take the entire bandwidth.
+*/
+int pakfire_builder_stream_logs(struct pakfire_builder* self) {
+       unsigned int lines;
+       int r;
+
+       // Bail if we don't have a control connection
+       if (!self->control)
+               return 0;
+
+       // Bail if the connection isn't ready to send
+       else if (!pakfire_xfer_is_ready_to_send(self->control))
+               return 0;
+
+       do {
+               // Reset lines
+               lines = 0;
+
+               // Have every job send one line
+               for (unsigned int i = 0; i < MAX_JOBS; i++) {
+                       // Skip any empty slots
+                       if (!self->jobs[i])
+                               continue;
+
+                       // Stream logs
+                       r = pakfire_job_stream_logs(self->jobs[i]);
+                       if (r < 0) {
+                               switch (-r) {
+                                       case EAGAIN:
+                                               return 0;
+
+                                       default:
+                                               return r;
+                               }
+                       }
+
+                       // Add up lines
+                       lines += r;
+               }
+       } while (lines);
+
+       return 0;
+}
index 508f711349a4211945c0118ec67ff254bcb0cc33..d43a323bac990c359ee9ab6709510094f2fdb5b4 100644 (file)
@@ -28,6 +28,8 @@ struct pakfire_builder;
 
 #include <pakfire/client.h>
 #include <pakfire/ctx.h>
+#include <pakfire/job.h>
+#include <pakfire/xfer.h>
 
 int pakfire_builder_create(struct pakfire_builder** builder,
        struct pakfire_ctx* ctx, struct pakfire_client* client);
@@ -42,7 +44,16 @@ int pakfire_builder_connect(struct pakfire_builder* self);
 int pakfire_builder_connected(struct pakfire_xfer* xfer, void* data);
 int pakfire_builder_close(struct pakfire_xfer* xfer, int code, void* data);
 int pakfire_builder_recv(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data);
+int pakfire_builder_send(struct pakfire_xfer* xfer, void* data);
 
 int pakfire_builder_send_message(struct pakfire_builder* self, struct json_object* message);
 
+// Jobs
+
+int pakfire_builder_terminate_jobs(struct pakfire_builder* self);
+int pakfire_builder_job_finished(struct pakfire_builder* self, struct pakfire_job* job);
+
+// Stream Logs
+int pakfire_builder_stream_logs(struct pakfire_builder* self);
+
 #endif /* PAKFIRE_BUILDER_H */
index 84de034a769c40a8a973af07ed8c852b9c739355..7a1bc52a3276881802ae78eb47d5e6178aa1eb20 100644 (file)
@@ -815,7 +815,7 @@ int pakfire_client_builder_connect(struct pakfire_client* self, struct pakfire_b
        r = pakfire_xfer_socket(xfer,
                        pakfire_builder_connected,
                        pakfire_builder_recv,
-                       NULL,
+                       pakfire_builder_send,
                        pakfire_builder_close,
                        builder);
        if (r < 0)
index ae99ef86dd3e19bddfb10b1cfbad2f52db41e3f4..9dce89894b5b672e4828d9f8249452d9880eec65 100644 (file)
 #include <pakfire/client.h>
 #include <pakfire/ctx.h>
 #include <pakfire/daemon.h>
-#include <pakfire/job.h>
 #include <pakfire/json.h>
 #include <pakfire/proctitle.h>
 #include <pakfire/string.h>
 #include <pakfire/util.h>
 
-#define MAX_JOBS 64
-
 struct pakfire_daemon {
        struct pakfire_ctx* ctx;
        int nrefs;
@@ -63,47 +60,8 @@ struct pakfire_daemon {
 
        // cgroup
        struct pakfire_cgroup* cgroup;
-
-       // Jobs
-       struct pakfire_job* jobs[MAX_JOBS];
-       unsigned int running_jobs;
 };
 
-static struct pakfire_job* pakfire_daemon_find_job(
-               struct pakfire_daemon* self, const char* job_id) {
-       // Walk through all jobs
-       for (unsigned int i = 0; i < MAX_JOBS; i++) {
-               if (!self->jobs[i])
-                       continue;
-
-               // Return the matching job
-               if (pakfire_job_has_id(self->jobs[i], job_id))
-                       return pakfire_job_ref(self->jobs[i]);
-       }
-
-       return NULL;
-}
-
-/*
-       Terminates all running jobs
-*/
-static int pakfire_daemon_terminate_jobs(struct pakfire_daemon* daemon) {
-       int r;
-
-       for (unsigned int i = 0; i < MAX_JOBS; i++) {
-               // Skip any empty slots
-               if (!daemon->jobs[i])
-                       continue;
-
-               // Terminate the job
-               r = pakfire_job_terminate(daemon->jobs[i], SIGTERM);
-               if (r)
-                       return r;
-       }
-
-       return 0;
-}
-
 static int pakfire_daemon_terminate(sd_event_source* source,
                const struct signalfd_siginfo* si, void* data) {
        struct pakfire_daemon* daemon = data;
@@ -111,7 +69,7 @@ static int pakfire_daemon_terminate(sd_event_source* source,
        DEBUG(daemon->ctx, "Received signal to terminate...\n");
 
        // Terminate all jobs
-       pakfire_daemon_terminate_jobs(daemon);
+       pakfire_builder_terminate_jobs(daemon->builder);
 
        return sd_event_exit(sd_event_source_get_event(source), 0);
 }
@@ -187,147 +145,6 @@ static int pakfire_daemon_release_inhibit_shutdown(struct pakfire_daemon* daemon
        return 0;
 }
 
-/*
-       Called when we have received a message for a specific job
-*/
-static int pakfire_daemon_handle_job_message(
-               struct pakfire_daemon* self, const char* job_id, struct json_object* message) {
-       struct pakfire_job* job = NULL;
-       int r;
-
-       // Find the job
-       job = pakfire_daemon_find_job(self, job_id);
-       if (!job) {
-               WARN(self->ctx, "Received message for job %s which does not exist. Ignoring.\n", job_id);
-               return 0;
-       }
-
-       // Dispatch the message to the job
-       r = pakfire_job_handle_message(job, message);
-       if (r < 0)
-               goto ERROR;
-
-ERROR:
-       if (job)
-               pakfire_job_unref(job);
-
-       return r;
-}
-
-/*
-       Called when a new job has been received
-*/
-static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) {
-       struct pakfire_job* job = NULL;
-       struct json_object* data = NULL;
-       int r;
-
-       // Inhibit shutdown
-       r = pakfire_daemon_inhibit_shutdown(daemon);
-       if (r < 0)
-               goto ERROR;
-
-       // Fetch the data from the message
-       if (!json_object_object_get_ex(m, "data", &data)) {
-               ERROR(daemon->ctx, "Job does not have any data\n");
-               r = -EINVAL;
-               goto ERROR;
-       }
-
-       // Create a new job
-       r = pakfire_job_create(&job, daemon->ctx, daemon, data);
-       if (r) {
-               ERROR(daemon->ctx, "Could not create a new job: %s\n", strerror(-r));
-               goto ERROR;
-       }
-
-       // Launch the job
-       r = pakfire_job_launch(job);
-       if (r < 0) {
-               ERROR(daemon->ctx, "Failed to launch the job: %s\n", strerror(-r));
-               goto ERROR;
-       }
-
-       // Store the job
-       for (unsigned int i = 0; i < MAX_JOBS; i++) {
-               if (!daemon->jobs[i]) {
-                       daemon->jobs[i] = pakfire_job_ref(job);
-                       break;
-               }
-       }
-
-       // Increment the number of running jobs
-       daemon->running_jobs++;
-
-ERROR:
-       if (job)
-               pakfire_job_unref(job);
-
-       return r;
-}
-
-/*
-       Called when we are ready to stream the log.
-
-       This function calls each running job which will only send one line
-       at a time to ensure that one job will not take the entire bandwidth.
-*/
-int pakfire_daemon_stream_logs(struct pakfire_daemon* self) {
-       unsigned int lines;
-       int r;
-
-#if 0
-       // Bail if we don't have a control connection
-       if (!self->control)
-               return 0;
-
-       // Bail if the connection isn't ready to send
-       else if (!pakfire_xfer_is_ready_to_send(self->control))
-               return 0;
-#endif
-
-       do {
-               // Reset lines
-               lines = 0;
-
-               // Have every job send one line
-               for (unsigned int i = 0; i < MAX_JOBS; i++) {
-                       // Skip any empty slots
-                       if (!self->jobs[i])
-                               continue;
-
-                       // Stream logs
-                       r = pakfire_job_stream_logs(self->jobs[i]);
-                       if (r < 0) {
-                               switch (-r) {
-                                       case EAGAIN:
-                                               return 0;
-
-                                       default:
-                                               return r;
-                               }
-                       }
-
-                       // Add up lines
-                       lines += r;
-               }
-       } while (lines);
-
-       return 0;
-}
-
-static int pakfire_daemon_send(struct pakfire_xfer* xfer, void* data) {
-       struct pakfire_daemon* self = data;
-       int r;
-
-       // Stream logs
-       r = pakfire_daemon_stream_logs(self);
-       if (r < 0)
-               return r;
-
-       return 0;
-}
-
 /*
        Called when the client is ready and we can start making connections...
 */
@@ -610,35 +427,3 @@ ERROR:
 
        return 1;
 }
-
-/*
-       Called after a job has exited
-*/
-int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job) {
-       int r;
-
-       DEBUG(daemon->ctx, "Removing job %p\n", job);
-
-       for (unsigned int i = 0; i < MAX_JOBS; i++) {
-               if (daemon->jobs[i] != job)
-                       continue;
-
-               // Dereference the job and clear the slot
-               pakfire_job_unref(daemon->jobs[i]);
-               daemon->jobs[i] = NULL;
-
-               break;
-       }
-
-       // Now we have one less job running
-       daemon->running_jobs--;
-
-       // Release the shutdown inhibition if there are no more jobs running
-       if (!daemon->running_jobs) {
-               r = pakfire_daemon_release_inhibit_shutdown(daemon);
-               if (r < 0)
-                       return r;
-       }
-
-       return 0;
-}
index 0b2a4d59864dbfe6e1101cf8b84381ea4cca0ea9..265c4e190546adc7cd80274fd3662cf16dbd9736 100644 (file)
@@ -40,11 +40,6 @@ const char* pakfire_daemon_url(struct pakfire_daemon* daemon);
 
 int pakfire_daemon_main(struct pakfire_daemon* daemon);
 
-int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job);
-
-// Stream Logs
-int pakfire_daemon_stream_logs(struct pakfire_daemon* self);
-
 // Send message
 int pakfire_daemon_send_message(struct pakfire_daemon* self, struct json_object* message);
 
index dbcb7c51a7bcfc2df3d5fde3875a264f53aa8c29..c539c975bcedff457a3fc36ce7f9e80a4725eec4 100644 (file)
 
 #include <pakfire/base64.h>
 #include <pakfire/build.h>
+#include <pakfire/builder.h>
 #include <pakfire/client.h>
 #include <pakfire/config.h>
 #include <pakfire/constants.h>
 #include <pakfire/ctx.h>
-#include <pakfire/daemon.h>
 #include <pakfire/job.h>
 #include <pakfire/json.h>
 #include <pakfire/log_file.h>
@@ -52,8 +52,8 @@ struct pakfire_job {
        struct pakfire_ctx* ctx;
        int nrefs;
 
-       // Daemon
-       struct pakfire_daemon* daemon;
+       // Builder
+       struct pakfire_builder* builder;
 
        // Client
        struct pakfire_client* client;
@@ -254,8 +254,8 @@ static void pakfire_job_free(struct pakfire_job* job) {
                pakfire_config_unref(job->config);
        if (job->loop)
                sd_event_unref(job->loop);
-       if (job->daemon)
-               pakfire_daemon_unref(job->daemon);
+       if (job->builder)
+               pakfire_builder_unref(job->builder);
        if (job->ctx)
                pakfire_ctx_unref(job->ctx);
        free(job);
@@ -277,10 +277,12 @@ static int pakfire_job_xfer_create(struct pakfire_xfer** xfer,
        if (r < 0)
                goto ERROR;
 
+#if 0
        // Set the base URL
        r = pakfire_xfer_set_baseurl(x, pakfire_daemon_url(job->daemon));
        if (r < 0)
                goto ERROR;
+#endif
 
        // Success
        *xfer = pakfire_xfer_ref(x);
@@ -514,8 +516,8 @@ static int pakfire_job_exited(sd_event_source* s, const siginfo_t* si, void* dat
                        break;
        }
 
-       // Let the daemon know this is finished
-       return pakfire_daemon_job_finished(job->daemon, job);
+       // Let the builder know this is finished
+       return pakfire_builder_job_finished(job->builder, job);
 }
 
 static int pakfire_job_send_log(struct pakfire_job* job, int priority, const char* line, size_t length) {
@@ -526,8 +528,8 @@ static int pakfire_job_send_log(struct pakfire_job* job, int priority, const cha
        if (r < 0)
                return r;
 
-       // Ask the daemon to send it
-       return pakfire_daemon_stream_logs(job->daemon);
+       // Ask the builder to send it
+       return pakfire_builder_stream_logs(job->builder);
 }
 
 static int pakfire_job_stdout(struct pakfire_ctx* ctx,
@@ -828,7 +830,7 @@ static int pakfire_job_send_log_line(struct pakfire_job* job,
                goto ERROR;
 
        // Send the message
-       r = pakfire_daemon_send_message(job->daemon, message);
+       r = pakfire_builder_send_message(job->builder, message);
        if (r < 0) {
                ERROR(job->ctx, "Could not send log message: %s\n", strerror(-r));
                goto ERROR;
@@ -878,7 +880,7 @@ int pakfire_job_stream_logs(struct pakfire_job* self) {
 }
 
 int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
-               struct pakfire_daemon* daemon, json_object* data) {
+               struct pakfire_builder* builder, json_object* data) {
        struct pakfire_job* j = NULL;
        int r;
 
@@ -893,8 +895,8 @@ int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
        // Initialize the reference counter
        j->nrefs = 1;
 
-       // Store a reference to the daemon
-       j->daemon = pakfire_daemon_ref(daemon);
+       // Store a reference to the builder
+       j->builder = pakfire_builder_ref(builder);
 
        // Fetch a reference to the event loop
        r = pakfire_ctx_loop(j->ctx, &j->loop);
@@ -903,8 +905,10 @@ int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
                goto ERROR;
        }
 
+#if 0
        // Fetch a reference to the client
        j->client = pakfire_daemon_client(daemon);
+#endif
 
        // Initialize the PID file descriptor
        j->pidfd = -EBADF;
index 967ab173072ca27ee9044e854327871b9c952328..a930fc7e1f704782dedd1e3bcb0837b5196ad86c 100644 (file)
 #ifndef PAKFIRE_JOB_H
 #define PAKFIRE_JOB_H
 
+#include <pakfire/builder.h>
 #include <pakfire/ctx.h>
-#include <pakfire/daemon.h>
 
 struct pakfire_job;
 
 int pakfire_job_create(struct pakfire_job** worker,
-       struct pakfire_ctx* ctx, struct pakfire_daemon* daemon, json_object* data);
+       struct pakfire_ctx* ctx, struct pakfire_builder* builder, json_object* data);
 
 struct pakfire_job* pakfire_job_ref(struct pakfire_job* worker);
 struct pakfire_job* pakfire_job_unref(struct pakfire_job* worker);