]> git.ipfire.org Git - pakfire.git/commitdiff
daemon: Add logging during the build job
authorMichael Tremer <michael.tremer@ipfire.org>
Sun, 29 May 2022 12:09:36 +0000 (12:09 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sun, 29 May 2022 12:09:36 +0000 (12:09 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/daemon.py

index dab977c0b67ac25dfff02326539a732a1713c5bd..cea06d980813e073105a7c205cafa6db18c7591e 100644 (file)
@@ -4,6 +4,7 @@ import asyncio
 import functools
 import json
 import logging
+import logging.handlers
 import multiprocessing
 import setproctitle
 import signal
@@ -240,14 +241,36 @@ class Worker(multiprocessing.Process):
                if not pkg:
                        raise ValueError("Did not received a package URL")
 
+               # Connect to the hub
+               self.job = await self.hub.job(job_id)
+
+               # Setup build logger
+               logger = BuildLogger(self.log, self.job)
+
+               # Send a new status
+               await self.job.status("building")
+
+               # Run the build
+               build = self._build(pkg, arch=arch,
+                       logger=logger._log, build_id=job_id)
+
+               # Wait until the build process is done and stream the log in the meantime
+               while not build.done():
+                       await logger.stream(timeout=1)
+
+       def _build(self, pkg, arch=None, logger=None, **kwargs):
+               """
+                       Sets up a new Pakfire instance and runs it in a new thread.
+
+                       This method returns an async.Task() object which can be used to track
+                       if this job is still running.
+               """
                # Setup Pakfire instance
                try:
                        p = _pakfire.Pakfire(
                                conf=self.pakfire_conf,
                                arch=arch,
-
-                               # Set up logging
-                               #logger=logger.log,
+                               logger=logger,
 
                                # Enable build mode and disable snapshots
                                build=True,
@@ -258,8 +281,11 @@ class Worker(multiprocessing.Process):
                        # Delete the configuration file
                        os.unlink(self.pakfire_conf)
 
-               # Run the build
-               p.build(pkg, build_id=job_id)
+               # Run the build in a new thread
+               thread = asyncio.to_thread(p.build, pkg, **kwargs)
+
+               # Return a task
+               return asyncio.create_task(thread)
 
        def shutdown(self):
                self.log.debug("Shutting down worker %s" % self.pid)
@@ -302,3 +328,46 @@ class Worker(multiprocessing.Process):
 
                # Return the path
                return f.name
+
+
+class BuildLogger(object):
+       """
+               This class groups together all sorts of logging.
+       """
+       def __init__(self, log, job):
+               self.log = log
+               self.job = job
+
+               # Create a logfile
+               self.logfile = tempfile.SpooledTemporaryFile(mode="w+", max_size=1048576)
+
+               # Create a FIFO queue to buffer any log messages
+               self.queue = asyncio.Queue()
+
+               # Create a new logger
+               self.logger = self.log.getChild(self.job.id)
+               self.logger.propagate = False
+               self.logger.setLevel(logging.DEBUG)
+
+               # Log everything to the queue
+               handler = logging.handlers.QueueHandler(self.queue)
+               self.logger.addHandler(handler)
+
+               # Log everything to the file
+               handler = logging.StreamHandler(self.logfile)
+               self.logger.addHandler(handler)
+
+       def _log(self, level, message):
+               return self.logger.log(level, message)
+
+       async def stream(self, timeout=0):
+               self.log.debug("Log streamer started")
+
+               while True:
+                       # Fetch a message from the queue
+                       message = await asyncio.wait_for(self.queue.get(), timeout=timeout)
+                       if message is None:
+                               continue
+
+                       # Send message to the hub
+                       await self.job.log(message.levelno, message.getMessage())