self.__running = True
# List of worker processes.
- self.__workers = []
+ self.workers = []
def connect_to_hub(self):
url = self.config.get("daemon", "server", PAKFIRE_HUB)
if self.queue:
self.queue.close()
- def spawn_worker(self, *args, **kwargs):
- """
- Spawns a new worker process.
- """
- worker = PakfireWorker(self.hub, *args, **kwargs)
- worker.start()
-
- self.log.debug("Spawned new worker process: %s" % worker)
- self.__workers.append(worker)
-
- def terminate_worker(self, worker):
- """
- Terminates the given worker.
- """
- self.log.warning(_("Terminating worker process: %s") % worker)
-
- worker.terminate()
-
def terminate_all_workers(self):
"""
Terminates all workers.
"""
- # First send SIGTERM to all processes.
+ self.log.debug("Sending SIGTERM to all workers")
+
+ # First send SIGTERM to all processes
for worker in self.workers:
- self.terminate_worker(worker)
+ worker.terminate()
+
+ self.log.debug("Waiting for workers to terminate")
# Then wait until they all have finished.
for worker in self.workers:
worker.join()
- def remove_worker(self, worker):
- """
- Removes a worker from the internal list of worker processes.
- """
- assert not worker.is_alive(), "Remove alive worker?"
-
- self.log.debug("Removing worker: %s" % worker)
- try:
- self.__workers.remove(worker)
- except:
- pass
+ self.log.debug("All workers have finished")
def cleanup_workers(self):
"""
self.remove_worker(worker)
- @property
- def workers(self):
- return [w for w in self.__workers if isinstance(w, PakfireWorker)]
-
- @property
- def running_workers(self):
- workers = []
-
- for worker in self.workers:
- if worker.waiting.is_set():
- continue
-
- workers.append(worker)
-
- return workers
-
- @property
- def waiting_workers(self):
- workers = []
-
- for worker in self.workers:
- if worker.waiting.is_set():
- workers.append(worker)
-
- return workers
-
# Signal handling.
def register_signal_handlers(self):
"""
Handle signal SIGCHLD.
"""
- # Spawn new workers if necessary.
- #self.spawn_workers_if_needed()
- pass
+ # Find the worker process that has terminated
+ for worker in self.workers:
+ # Skip any workers that are still alive
+ if worker.is_alive():
+ continue
+
+ self.log.debug("Worker %s has terminated with status %s" % \
+ (worker.pid, worker.exitcode))
+
+ # Remove the worker from the list
+ try:
+ self.workers.remove(worker)
+ except ValueError:
+ pass
+
+ # Close the process
+ worker.close()
+
+ # We finish after handling one worker. If multiple workers have finished
+ # at the same time, this handler will be called again to handle it.
+ break
def handle_SIGTERM(self, signum, frame):
"""
"""
Called when this builder was assigned a new job
"""
- log.debug("Received job:\n%s" % json.dumps(job, sort_keys=True, indent=4))
+ # Check for correct message type
+ if not job.get("message") == "job":
+ raise RuntimeError("Received a message of an unknown type:\n%s" % job)
+
+ # Log what we have received
+ self.log.debug("Received job:")
+ self.log.debug("%s" % json.dumps(job, sort_keys=True, indent=4))
+
+ # Launch a new worker
+ worker = Worker(
+ self.hub,
+ job,
+ debug=self.debug,
+ )
+ self.workers.append(worker)
- # XXX todo
+ # Run it
+ worker.start()
+ self.log.debug("Spawned a new worker process as PID %s" % worker.pid)
-class PakfireWorker(multiprocessing.Process):
- def __init__(self, hub, waiting=None):
+
+class Worker(multiprocessing.Process):
+ def __init__(self, hub, data, debug=False):
multiprocessing.Process.__init__(self)
+
self.hub = hub
- # Waiting event. Clear if this worker is running a build.
- self.waiting = multiprocessing.Event()
- self.waiting.set()
+ # The job that has been received
+ self.data = data
- # Indicates if this worker is running.
- self.__running = True
+ # Setup a logger
+ self.log = logger.setup(
+ "pakfire.daemon.worker.%s" % self.pid,
+ level=logging.DEBUG if debug else logging.INFO,
+ syslog_identifier="pakfire-worker",
+ )
def run(self):
- # Register signal handlers.
- self.register_signal_handlers()
+ self.log.debug("Worker %s has launched" % self.pid)
- while self.__running:
- # Try to get a new build job.
- job = self.get_new_build_job()
- if not job:
- continue
+ # Register signal handlers
+ self.register_signal_handlers()
- # If we got a job, we are not waiting anymore.
- self.waiting.clear()
+ # XXX Do something for now
+ import time
+ time.sleep(10)
- # Run the job and return.
- return self.execute_job(job)
+ self.log.debug("Worker %s terminated gracefully" % self.pid)
def shutdown(self):
self.__running = False
# When we are just waiting, we can edit right away.
if self.waiting.is_set():
- log.debug("Exiting immediately")
+ self.log.debug("Exiting immediately")
sys.exit(1)
# XXX figure out what to do, when a build is running.
"""
self.shutdown()
- def get_new_build_job(self, timeout=600):
- log.debug("Requesting new job...")
-
- try:
- job = self.hub._request("/builders/jobs/queue", method="GET", decode="json",
- data={ "timeout" : timeout, }, timeout=timeout)
-
- if job:
- return BuildJob(job)
-
- # As this is a long poll request, it is okay to time out.
- except TransportMaxTriesExceededError:
- pass
-
def execute_job(self, job):
log.debug("Executing job: %s" % job)