From: Michael Tremer Date: Sun, 29 May 2022 12:09:36 +0000 (+0000) Subject: daemon: Add logging during the build job X-Git-Tag: 0.9.28~719 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cae7ad0edfac15e0fd18aa0bc5e95ba95942039a;p=pakfire.git daemon: Add logging during the build job Signed-off-by: Michael Tremer --- diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py index dab977c0b..cea06d980 100644 --- a/src/pakfire/daemon.py +++ b/src/pakfire/daemon.py @@ -4,6 +4,7 @@ import asyncio import functools import json import logging +import logging.handlers import multiprocessing import setproctitle import signal @@ -240,14 +241,36 @@ class Worker(multiprocessing.Process): if not pkg: raise ValueError("Did not received a package URL") + # Connect to the hub + self.job = await self.hub.job(job_id) + + # Setup build logger + logger = BuildLogger(self.log, self.job) + + # Send a new status + await self.job.status("building") + + # Run the build + build = self._build(pkg, arch=arch, + logger=logger._log, build_id=job_id) + + # Wait until the build process is done and stream the log in the meantime + while not build.done(): + await logger.stream(timeout=1) + + def _build(self, pkg, arch=None, logger=None, **kwargs): + """ + Sets up a new Pakfire instance and runs it in a new thread. + + This method returns an async.Task() object which can be used to track + if this job is still running. + """ # Setup Pakfire instance try: p = _pakfire.Pakfire( conf=self.pakfire_conf, arch=arch, - - # Set up logging - #logger=logger.log, + logger=logger, # Enable build mode and disable snapshots build=True, @@ -258,8 +281,11 @@ class Worker(multiprocessing.Process): # Delete the configuration file os.unlink(self.pakfire_conf) - # Run the build - p.build(pkg, build_id=job_id) + # Run the build in a new thread + thread = asyncio.to_thread(p.build, pkg, **kwargs) + + # Return a task + return asyncio.create_task(thread) def shutdown(self): self.log.debug("Shutting down worker %s" % self.pid) @@ -302,3 +328,46 @@ class Worker(multiprocessing.Process): # Return the path return f.name + + +class BuildLogger(object): + """ + This class groups together all sorts of logging. + """ + def __init__(self, log, job): + self.log = log + self.job = job + + # Create a logfile + self.logfile = tempfile.SpooledTemporaryFile(mode="w+", max_size=1048576) + + # Create a FIFO queue to buffer any log messages + self.queue = asyncio.Queue() + + # Create a new logger + self.logger = self.log.getChild(self.job.id) + self.logger.propagate = False + self.logger.setLevel(logging.DEBUG) + + # Log everything to the queue + handler = logging.handlers.QueueHandler(self.queue) + self.logger.addHandler(handler) + + # Log everything to the file + handler = logging.StreamHandler(self.logfile) + self.logger.addHandler(handler) + + def _log(self, level, message): + return self.logger.log(level, message) + + async def stream(self, timeout=0): + self.log.debug("Log streamer started") + + while True: + # Fetch a message from the queue + message = await asyncio.wait_for(self.queue.get(), timeout=timeout) + if message is None: + continue + + # Send message to the hub + await self.job.log(message.levelno, message.getMessage())