From 60175a9e0666eb42a8166e076ec53dfc59cf9f3c Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Sun, 26 Jan 2025 14:01:32 +0000 Subject: [PATCH] builders: Refactor dispatching jobs Signed-off-by: Michael Tremer --- src/buildservice/builders.py | 47 +++++++++++------------------------- src/buildservice/jobs.py | 43 +++++++++++++++++++++++++++++---- 2 files changed, 52 insertions(+), 38 deletions(-) diff --git a/src/buildservice/builders.py b/src/buildservice/builders.py index ba8fb2a0..687e728b 100644 --- a/src/buildservice/builders.py +++ b/src/buildservice/builders.py @@ -4,6 +4,7 @@ import asyncio import botocore.exceptions import datetime import functools +import json import logging import sqlalchemy @@ -909,42 +910,22 @@ class Builder(database.Base, database.BackendMixin, database.SoftDeleteMixin): connected_from = Column(INET) - def dispatch_job(self, job): - # Throw an error if the builder isn't online any more - if not self.is_online(): - raise BuilderNotOnlineError + # Send Message - log.debug("Dispatching build job %s to %s" % (job, self)) - - # Assign this builder to the job - job.assign(builder=self) - - # Send a message to the builder - self.connection.write_message({ - "type" : "job", - "data" : { - # Add job information - "id" : "%s" % job.uuid, - "name" : "%s" % job, - "arch" : job.arch, - - # ccache path - "ccache" : { - "enabled" : job.ccache_enabled, - "path" : job.ccache_path, - }, - - # Is this a test job? - "test" : job.is_test(), - - # Send the pakfire configuration without using any mirrors - "conf" : "%s" % job.pakfire(build=True, mirrored=False), + async def send_message(self, message): + """ + Sends a message to the builder + """ + # Raise an error if the builder is not online + if not self.connection: + raise BuilderNotOnlineError(self) - # URL to the package - "pkg" : job.pkg.download_url, - }, - }) + # Log action + log.debug("Sending message to %s:\n%s", + self, json.dumps(message, indent=4, sort_keys=True)) + # Write the message to the control connection + return self.connection.write_message(message) # Uploads diff --git a/src/buildservice/jobs.py b/src/buildservice/jobs.py index f28a5276..3020460e 100644 --- a/src/buildservice/jobs.py +++ b/src/buildservice/jobs.py @@ -372,7 +372,7 @@ class Queue(base.Object): continue # If we have a job, we dispatch it to the builder - builder.dispatch_job(job) + await job.dispatch(builder) # Once we dispatched a job, we are done break @@ -641,18 +641,51 @@ class Job(database.Base, database.BackendMixin, database.SoftDeleteMixin): # Return the remaining time return self.timeout - self.duration - def assign(self, builder): + async def dispatch(self, builder): """ - Assigns this job to a builder + Called to dispatch this job to the given builder """ - log.info("Starting job %s on %s" % (self, builder)) + log.debug("Dispatching %s to %s" % (self, builder)) - # Store the assigned builder + # Store the builder self.builder = builder # Store the time self.started_at = sqlalchemy.func.current_timestamp() + # Launch Pakfire in build mode without any mirrors + pakfire = self.pakfire(build=True, mirrored=False) + + # Generate the configuration file + config = await pakfire.config() + + # Send a message to the builder + await builder.send_message({ + "type" : "job", + + "data" : { + # Add job information + "id" : "%s" % self.uuid, + "name" : "%s" % self, + "arch" : self.arch, + + # ccache path + "ccache" : { + "enabled" : self.ccache_enabled, + "path" : self.ccache_path, + }, + + # Is this a test job? + "test" : self.is_test(), + + # Configuration + "config" : config.getvalue(), + + # URL to the package + "pkg" : self.pkg.download_url, + }, + }) + def connected(self, connection): """ Called when a builder has connected -- 2.47.3