import functools
import json
import logging
+import logging.handlers
import multiprocessing
import setproctitle
import signal
if not pkg:
raise ValueError("Did not received a package URL")
+ # Connect to the hub
+ self.job = await self.hub.job(job_id)
+
+ # Setup build logger
+ logger = BuildLogger(self.log, self.job)
+
+ # Send a new status
+ await self.job.status("building")
+
+ # Run the build
+ build = self._build(pkg, arch=arch,
+ logger=logger._log, build_id=job_id)
+
+ # Wait until the build process is done and stream the log in the meantime
+ while not build.done():
+ await logger.stream(timeout=1)
+
+ def _build(self, pkg, arch=None, logger=None, **kwargs):
+ """
+ Sets up a new Pakfire instance and runs it in a new thread.
+
+ This method returns an async.Task() object which can be used to track
+ if this job is still running.
+ """
# Setup Pakfire instance
try:
p = _pakfire.Pakfire(
conf=self.pakfire_conf,
arch=arch,
-
- # Set up logging
- #logger=logger.log,
+ logger=logger,
# Enable build mode and disable snapshots
build=True,
# Delete the configuration file
os.unlink(self.pakfire_conf)
- # Run the build
- p.build(pkg, build_id=job_id)
+ # Run the build in a new thread
+ thread = asyncio.to_thread(p.build, pkg, **kwargs)
+
+ # Return a task
+ return asyncio.create_task(thread)
def shutdown(self):
self.log.debug("Shutting down worker %s" % self.pid)
# Return the path
return f.name
+
+
+class BuildLogger(object):
+ """
+ This class groups together all sorts of logging.
+ """
+ def __init__(self, log, job):
+ self.log = log
+ self.job = job
+
+ # Create a logfile
+ self.logfile = tempfile.SpooledTemporaryFile(mode="w+", max_size=1048576)
+
+ # Create a FIFO queue to buffer any log messages
+ self.queue = asyncio.Queue()
+
+ # Create a new logger
+ self.logger = self.log.getChild(self.job.id)
+ self.logger.propagate = False
+ self.logger.setLevel(logging.DEBUG)
+
+ # Log everything to the queue
+ handler = logging.handlers.QueueHandler(self.queue)
+ self.logger.addHandler(handler)
+
+ # Log everything to the file
+ handler = logging.StreamHandler(self.logfile)
+ self.logger.addHandler(handler)
+
+ def _log(self, level, message):
+ return self.logger.log(level, message)
+
+ async def stream(self, timeout=0):
+ self.log.debug("Log streamer started")
+
+ while True:
+ # Fetch a message from the queue
+ message = await asyncio.wait_for(self.queue.get(), timeout=timeout)
+ if message is None:
+ continue
+
+ # Send message to the hub
+ await self.job.log(message.levelno, message.getMessage())