]> git.ipfire.org Git - pakfire.git/commitdiff
daemon: Fork new worker process when we receive a build job
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 27 May 2022 09:44:02 +0000 (09:44 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 27 May 2022 09:44:02 +0000 (09:44 +0000)
The previous system was that the worker processes would have been
pre-forked and waiting for a job which was randomly allocated.

This model is now changing since we no longer poll, but push build jobs
from the hub to the builders.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/daemon.py

index e279689eb4eeb6f94a726c5559c2b499f5dc09e2..3977d739290d0cea3d86eee7bd91d5f02d4c4151 100644 (file)
@@ -44,7 +44,7 @@ class Daemon(object):
                self.__running = True
 
                # List of worker processes.
-               self.__workers = []
+               self.workers = []
 
        def connect_to_hub(self):
                url = self.config.get("daemon", "server", PAKFIRE_HUB)
@@ -97,47 +97,23 @@ class Daemon(object):
                if self.queue:
                        self.queue.close()
 
-       def spawn_worker(self, *args, **kwargs):
-               """
-                       Spawns a new worker process.
-               """
-               worker = PakfireWorker(self.hub, *args, **kwargs)
-               worker.start()
-
-               self.log.debug("Spawned new worker process: %s" % worker)
-               self.__workers.append(worker)
-
-       def terminate_worker(self, worker):
-               """
-                       Terminates the given worker.
-               """
-               self.log.warning(_("Terminating worker process: %s") % worker)
-
-               worker.terminate()
-
        def terminate_all_workers(self):
                """
                        Terminates all workers.
                """
-               # First send SIGTERM to all processes.
+               self.log.debug("Sending SIGTERM to all workers")
+
+               # First send SIGTERM to all processes
                for worker in self.workers:
-                       self.terminate_worker(worker)
+                       worker.terminate()
+
+               self.log.debug("Waiting for workers to terminate")
 
                # Then wait until they all have finished.
                for worker in self.workers:
                        worker.join()
 
-       def remove_worker(self, worker):
-               """
-                       Removes a worker from the internal list of worker processes.
-               """
-               assert not worker.is_alive(), "Remove alive worker?"
-
-               self.log.debug("Removing worker: %s" % worker)
-               try:
-                       self.__workers.remove(worker)
-               except:
-                       pass
+               self.log.debug("All workers have finished")
 
        def cleanup_workers(self):
                """
@@ -149,32 +125,6 @@ class Daemon(object):
 
                        self.remove_worker(worker)
 
-       @property
-       def workers(self):
-               return [w for w in self.__workers if isinstance(w, PakfireWorker)]
-
-       @property
-       def running_workers(self):
-               workers = []
-
-               for worker in self.workers:
-                       if worker.waiting.is_set():
-                               continue
-
-                       workers.append(worker)
-
-               return workers
-
-       @property
-       def waiting_workers(self):
-               workers = []
-
-               for worker in self.workers:
-                       if worker.waiting.is_set():
-                               workers.append(worker)
-
-               return workers
-
        # Signal handling.
 
        def register_signal_handlers(self):
@@ -186,9 +136,27 @@ class Daemon(object):
                """
                        Handle signal SIGCHLD.
                """
-               # Spawn new workers if necessary.
-               #self.spawn_workers_if_needed()
-               pass
+               # Find the worker process that has terminated
+               for worker in self.workers:
+                       # Skip any workers that are still alive
+                       if worker.is_alive():
+                               continue
+
+                       self.log.debug("Worker %s has terminated with status %s" % \
+                               (worker.pid, worker.exitcode))
+
+                       # Remove the worker from the list
+                       try:
+                               self.workers.remove(worker)
+                       except ValueError:
+                               pass
+
+                       # Close the process
+                       worker.close()
+
+                       # We finish after handling one worker. If multiple workers have finished
+                       # at the same time, this handler will be called again to handle it.
+                       break
 
        def handle_SIGTERM(self, signum, frame):
                """
@@ -201,45 +169,62 @@ class Daemon(object):
                """
                        Called when this builder was assigned a new job
                """
-               log.debug("Received job:\n%s" % json.dumps(job, sort_keys=True, indent=4))
+               # Check for correct message type
+               if not job.get("message") == "job":
+                       raise RuntimeError("Received a message of an unknown type:\n%s" % job)
+
+               # Log what we have received
+               self.log.debug("Received job:")
+               self.log.debug("%s" % json.dumps(job, sort_keys=True, indent=4))
+
+               # Launch a new worker
+               worker = Worker(
+                       self.hub,
+                       job,
+                       debug=self.debug,
+               )
+               self.workers.append(worker)
 
-               # XXX todo
+               # Run it
+               worker.start()
 
+               self.log.debug("Spawned a new worker process as PID %s" % worker.pid)
 
-class PakfireWorker(multiprocessing.Process):
-       def __init__(self, hub, waiting=None):
+
+class Worker(multiprocessing.Process):
+       def __init__(self, hub, data, debug=False):
                multiprocessing.Process.__init__(self)
+
                self.hub = hub
 
-               # Waiting event. Clear if this worker is running a build.
-               self.waiting = multiprocessing.Event()
-               self.waiting.set()
+               # The job that has been received
+               self.data = data
 
-               # Indicates if this worker is running.
-               self.__running = True
+               # Setup a logger
+               self.log = logger.setup(
+                       "pakfire.daemon.worker.%s" % self.pid,
+                       level=logging.DEBUG if debug else logging.INFO,
+                       syslog_identifier="pakfire-worker",
+               )
 
        def run(self):
-               # Register signal handlers.
-               self.register_signal_handlers()
+               self.log.debug("Worker %s has launched" % self.pid)
 
-               while self.__running:
-                       # Try to get a new build job.
-                       job = self.get_new_build_job()
-                       if not job:
-                               continue
+               # Register signal handlers
+               self.register_signal_handlers()
 
-                       # If we got a job, we are not waiting anymore.
-                       self.waiting.clear()
+               # XXX Do something for now
+               import time
+               time.sleep(10)
 
-                       # Run the job and return.
-                       return self.execute_job(job)
+               self.log.debug("Worker %s terminated gracefully" % self.pid)
 
        def shutdown(self):
                self.__running = False
 
                # When we are just waiting, we can edit right away.
                if self.waiting.is_set():
-                       log.debug("Exiting immediately")
+                       self.log.debug("Exiting immediately")
                        sys.exit(1)
 
                # XXX figure out what to do, when a build is running.
@@ -265,20 +250,6 @@ class PakfireWorker(multiprocessing.Process):
                """
                self.shutdown()
 
-       def get_new_build_job(self, timeout=600):
-               log.debug("Requesting new job...")
-
-               try:
-                       job = self.hub._request("/builders/jobs/queue", method="GET", decode="json",
-                               data={ "timeout" : timeout, }, timeout=timeout)
-
-                       if job:
-                               return BuildJob(job)
-
-               # As this is a long poll request, it is okay to time out.
-               except TransportMaxTriesExceededError:
-                       pass
-
        def execute_job(self, job):
                log.debug("Executing job: %s" % job)