From: Michael Tremer Date: Fri, 27 May 2022 09:44:02 +0000 (+0000) Subject: daemon: Fork new worker process when we receive a build job X-Git-Tag: 0.9.28~730 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=891fb70990bb4840c5cb7af02bf0b294c5802d21;p=pakfire.git daemon: Fork new worker process when we receive a build job The previous system was that the worker processes would have been pre-forked and waiting for a job which was randomly allocated. This model is now changing since we no longer poll, but push build jobs from the hub to the builders. Signed-off-by: Michael Tremer --- diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py index e279689eb..3977d7392 100644 --- a/src/pakfire/daemon.py +++ b/src/pakfire/daemon.py @@ -44,7 +44,7 @@ class Daemon(object): self.__running = True # List of worker processes. - self.__workers = [] + self.workers = [] def connect_to_hub(self): url = self.config.get("daemon", "server", PAKFIRE_HUB) @@ -97,47 +97,23 @@ class Daemon(object): if self.queue: self.queue.close() - def spawn_worker(self, *args, **kwargs): - """ - Spawns a new worker process. - """ - worker = PakfireWorker(self.hub, *args, **kwargs) - worker.start() - - self.log.debug("Spawned new worker process: %s" % worker) - self.__workers.append(worker) - - def terminate_worker(self, worker): - """ - Terminates the given worker. - """ - self.log.warning(_("Terminating worker process: %s") % worker) - - worker.terminate() - def terminate_all_workers(self): """ Terminates all workers. """ - # First send SIGTERM to all processes. + self.log.debug("Sending SIGTERM to all workers") + + # First send SIGTERM to all processes for worker in self.workers: - self.terminate_worker(worker) + worker.terminate() + + self.log.debug("Waiting for workers to terminate") # Then wait until they all have finished. for worker in self.workers: worker.join() - def remove_worker(self, worker): - """ - Removes a worker from the internal list of worker processes. - """ - assert not worker.is_alive(), "Remove alive worker?" - - self.log.debug("Removing worker: %s" % worker) - try: - self.__workers.remove(worker) - except: - pass + self.log.debug("All workers have finished") def cleanup_workers(self): """ @@ -149,32 +125,6 @@ class Daemon(object): self.remove_worker(worker) - @property - def workers(self): - return [w for w in self.__workers if isinstance(w, PakfireWorker)] - - @property - def running_workers(self): - workers = [] - - for worker in self.workers: - if worker.waiting.is_set(): - continue - - workers.append(worker) - - return workers - - @property - def waiting_workers(self): - workers = [] - - for worker in self.workers: - if worker.waiting.is_set(): - workers.append(worker) - - return workers - # Signal handling. def register_signal_handlers(self): @@ -186,9 +136,27 @@ class Daemon(object): """ Handle signal SIGCHLD. """ - # Spawn new workers if necessary. - #self.spawn_workers_if_needed() - pass + # Find the worker process that has terminated + for worker in self.workers: + # Skip any workers that are still alive + if worker.is_alive(): + continue + + self.log.debug("Worker %s has terminated with status %s" % \ + (worker.pid, worker.exitcode)) + + # Remove the worker from the list + try: + self.workers.remove(worker) + except ValueError: + pass + + # Close the process + worker.close() + + # We finish after handling one worker. If multiple workers have finished + # at the same time, this handler will be called again to handle it. + break def handle_SIGTERM(self, signum, frame): """ @@ -201,45 +169,62 @@ class Daemon(object): """ Called when this builder was assigned a new job """ - log.debug("Received job:\n%s" % json.dumps(job, sort_keys=True, indent=4)) + # Check for correct message type + if not job.get("message") == "job": + raise RuntimeError("Received a message of an unknown type:\n%s" % job) + + # Log what we have received + self.log.debug("Received job:") + self.log.debug("%s" % json.dumps(job, sort_keys=True, indent=4)) + + # Launch a new worker + worker = Worker( + self.hub, + job, + debug=self.debug, + ) + self.workers.append(worker) - # XXX todo + # Run it + worker.start() + self.log.debug("Spawned a new worker process as PID %s" % worker.pid) -class PakfireWorker(multiprocessing.Process): - def __init__(self, hub, waiting=None): + +class Worker(multiprocessing.Process): + def __init__(self, hub, data, debug=False): multiprocessing.Process.__init__(self) + self.hub = hub - # Waiting event. Clear if this worker is running a build. - self.waiting = multiprocessing.Event() - self.waiting.set() + # The job that has been received + self.data = data - # Indicates if this worker is running. - self.__running = True + # Setup a logger + self.log = logger.setup( + "pakfire.daemon.worker.%s" % self.pid, + level=logging.DEBUG if debug else logging.INFO, + syslog_identifier="pakfire-worker", + ) def run(self): - # Register signal handlers. - self.register_signal_handlers() + self.log.debug("Worker %s has launched" % self.pid) - while self.__running: - # Try to get a new build job. - job = self.get_new_build_job() - if not job: - continue + # Register signal handlers + self.register_signal_handlers() - # If we got a job, we are not waiting anymore. - self.waiting.clear() + # XXX Do something for now + import time + time.sleep(10) - # Run the job and return. - return self.execute_job(job) + self.log.debug("Worker %s terminated gracefully" % self.pid) def shutdown(self): self.__running = False # When we are just waiting, we can edit right away. if self.waiting.is_set(): - log.debug("Exiting immediately") + self.log.debug("Exiting immediately") sys.exit(1) # XXX figure out what to do, when a build is running. @@ -265,20 +250,6 @@ class PakfireWorker(multiprocessing.Process): """ self.shutdown() - def get_new_build_job(self, timeout=600): - log.debug("Requesting new job...") - - try: - job = self.hub._request("/builders/jobs/queue", method="GET", decode="json", - data={ "timeout" : timeout, }, timeout=timeout) - - if job: - return BuildJob(job) - - # As this is a long poll request, it is okay to time out. - except TransportMaxTriesExceededError: - pass - def execute_job(self, job): log.debug("Executing job: %s" % job)