]> git.ipfire.org Git - pbs.git/blobdiff - src/buildservice/__init__.py
backend: Move cleanup job into the main thread
[pbs.git] / src / buildservice / __init__.py
index ed09699911bada04978166af1d3171c21338d6c0..862ed46b42aa615af273779870a011f73a24d523 100644 (file)
@@ -1,12 +1,15 @@
 #!/usr/bin/python
 
+import aiofiles
 import asyncio
 import configparser
+import datetime
 import inspect
 import logging
 import os
 import pakfire
 import shutil
+import ssl
 import systemd.journal
 import tempfile
 import urllib.parse
@@ -20,12 +23,15 @@ from . import config
 from . import database
 from . import distribution
 from . import events
-from . import jobqueue
+from . import httpclient
 from . import jobs
 from . import keys
+from . import logstreams
 from . import messages
 from . import mirrors
 from . import packages
+from . import ratelimiter
+from . import releasemonitoring
 from . import repository
 from . import settings
 from . import sessions
@@ -33,7 +39,8 @@ from . import sources
 from . import uploads
 from . import users
 
-log = logging.getLogger("pakfire.buildservice")
+# Setup logging
+log = logging.getLogger("pbs")
 
 # Import version
 from .__version__ import VERSION as __version__
@@ -47,8 +54,10 @@ class Backend(object):
        # A list of any background tasks
        __tasks = set()
 
-       def __init__(self, config_file):
-               # Read configuration file.
+       def __init__(self, config_file, test=False):
+               self.test = test
+
+               # Read configuration file
                self.config = self.read_config(config_file)
 
                # Fetch the base path
@@ -57,6 +66,9 @@ class Backend(object):
                # Global pakfire settings (from database).
                self.settings = settings.Settings(self)
 
+               # Initialize the HTTP Client
+               self.httpclient = httpclient.HTTPClient(self)
+
                self.aws         = aws.AWS(self)
                self.builds      = builds.Builds(self)
                self.cache       = cache.Cache(self)
@@ -64,11 +76,13 @@ class Backend(object):
                self.builders    = builders.Builders(self)
                self.distros     = distribution.Distributions(self)
                self.events      = events.Events(self)
-               self.jobqueue    = jobqueue.JobQueue(self)
                self.keys        = keys.Keys(self)
+               self.logstreams  = logstreams.LogStreams(self)
                self.messages    = messages.Messages(self)
                self.mirrors     = mirrors.Mirrors(self)
                self.packages    = packages.Packages(self)
+               self.monitorings = releasemonitoring.Monitorings(self)
+               self.ratelimiter = ratelimiter.RateLimiter(self)
                self.repos       = repository.Repositories(self)
                self.sessions    = sessions.Sessions(self)
                self.sources     = sources.Sources(self)
@@ -76,10 +90,33 @@ class Backend(object):
                self.users       = users.Users(self)
 
                # Open a connection to bugzilla.
-               self.bugzilla    = bugtracker.Bugzilla(self)
+               self.bugzilla          = bugtracker.Bugzilla(self)
+
+               # Create a temporary directory
+               self._create_tmp_path()
 
                log.info("Pakfire Build Service initialized at %s" % self.basepath)
 
+       def launch_background_tasks(self):
+               # Launch some initial tasks
+               self.run_task(self.users.generate_vapid_keys)
+               self.run_task(self.builders.autoscale)
+
+               # Regularly sync data to the mirrors
+               self.run_periodic_task(300, self.sync)
+
+               # Regularly check the mirrors
+               self.run_periodic_task(300, self.mirrors.check)
+
+               # Regularly fetch sources
+               self.run_periodic_task(300, self.sources.fetch)
+
+               # Regularly check for new releases
+               self.run_periodic_task(300, self.monitorings.check)
+
+               # Cleanup regularly
+               self.run_periodic_task(3600, self.cleanup)
+
        def read_config(self, path):
                c = configparser.ConfigParser()
 
@@ -101,7 +138,20 @@ class Backend(object):
 
                log.debug("Connecting to database %s @ %s" % (name, hostname))
 
-               return database.Connection(hostname, name, user=user, password=password)
+               return database.Connection(self, hostname, name, user=user, password=password)
+
+       def _create_tmp_path(self):
+               """
+                       This function will create some temporary space with the correct permissions.
+               """
+               path = self.path("tmp")
+
+               try:
+                       os.mkdir(path, mode=0o1777)
+
+               # Ignore if the directory already exists
+               except FileExistsError:
+                       pass
 
        def path(self, *args):
                """
@@ -109,20 +159,26 @@ class Backend(object):
                """
                return os.path.join(self.basepath, *args)
 
-       def path_to_url(self, path):
+       def url_to(self, url):
                """
-                       Takes a path to a file on the file system and converts it into a URL
+                       Takes a relative URL and makes it absolute
                """
                # The base URL
                baseurl = self.settings.get("baseurl")
 
+               # Join it all together
+               return urllib.parse.urljoin(baseurl, url)
+
+       def path_to_url(self, path):
+               """
+                       Takes a path to a file on the file system and converts it into a URL
+               """
                # Path to package
                path = os.path.join(
                        "files", os.path.relpath(path, self.basepath),
                )
 
-               # Join it all together
-               return urllib.parse.urljoin(baseurl, path)
+               return self.url_to(path)
 
        def pakfire(self, *args, **kwargs):
                """
@@ -159,9 +215,6 @@ class Backend(object):
                log.debug("Periodic callback %r started" % callback)
 
                while True:
-                       # Wait a little moment
-                       await asyncio.sleep(delay)
-
                        try:
                                ret = callback(*args)
 
@@ -172,27 +225,66 @@ class Backend(object):
                        except Exception as e:
                                log.error("Exception in periodic callback %r" % callback, exc_info=True)
 
+                       # Wait a little moment
+                       await asyncio.sleep(delay)
+
        # Commands
 
-       async def command(self, *args, krb5_auth=False, **kwargs):
+       async def command(self, *command, krb5_auth=False, **kwargs):
                """
                        Runs this shell command
                """
-               # Authenticate using Kerberos
-               if krb5_auth:
-                       await self.krb5_auth()
+               async with self.tempdir() as tmp:
+                       # Create a minimal environment
+                       env = {
+                               "HOME"       : os.environ.get("HOME", tmp),
+
+                               # Tell the system where to put temporary files
+                               "TMPDIR"     : tmp,
+
+                               # Store any Kerberos credentials here
+                               "KRB5CCNAME" : os.path.join(tmp, ".krb5cc"),
+                       }
+
+                       # Authenticate using Kerberos
+                       if krb5_auth:
+                               await self._krb5_auth(env=env)
+
+                       # Run the command
+                       return await self._command(*command, env=env, **kwargs)
 
-               log.debug("Running command: %s" % " ".join(args))
+       async def _command(self, *command, return_output=False, input=None, **kwargs):
+               log.debug("Running command: %s" % " ".join(command))
 
                # Fork child process
                process = await asyncio.create_subprocess_exec(
-                       *args,
-                       stdin=asyncio.subprocess.DEVNULL,
+                       *command,
+                       stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
                        stdout=asyncio.subprocess.PIPE,
-                       stderr=asyncio.subprocess.STDOUT,
+                       stderr=asyncio.subprocess.PIPE,
                        **kwargs,
                )
 
+               # Send input
+               if input:
+                       # Convert to bytes
+                       if not isinstance(input, bytes):
+                               input = input.encode()
+
+                       # Write the entire data chunk by chunk
+                       while input:
+                               chunk, input = input[0:64], input[64:]
+                               if not chunk:
+                                       break
+
+                               process.stdin.write(chunk)
+                               await process.stdin.drain()
+
+                       # Close the input once we are done
+                       process.stdin.close()
+
+               stdout = []
+
                # Fetch output of command and send it to the logger
                while True:
                        line = await process.stdout.readline()
@@ -205,12 +297,36 @@ class Backend(object):
                        # Strip newline
                        line = line.rstrip()
 
-                       log.info(line)
+                       # Log the output
+                       log.debug(line)
+
+                       # Store the output if requested
+                       if return_output:
+                               stdout.append(line)
 
                # Wait until the process has finished
-               await process.wait()
+               returncode = await process.wait()
+
+               # Check the return code
+               if returncode:
+                       # Fetch any output from the standard error output
+                       stderr = await process.stderr.read()
+                       stderr = stderr.decode()
 
-       async def krb5_auth(self):
+                       # Log the error
+                       log.error("Error running command: %s (code=%s)" % (
+                               " ".join(command), returncode,
+                       ))
+                       if stderr:
+                               log.error(stderr)
+
+                       raise CommandExecutionError(returncode, stderr)
+
+               # Return output if requested
+               if return_output:
+                       return "\n".join(stdout)
+
+       async def _krb5_auth(self, **kwargs):
                log.debug("Performing Kerberos authentication...")
 
                # Fetch path to keytab
@@ -226,43 +342,142 @@ class Backend(object):
                        return
 
                # Fetch a Kerberos ticket
-               await self.command(
-                       "kinit", "-k", "-t", keytab, principal,
+               await self._command(
+                       "kinit", "-k", "-t", keytab, principal, **kwargs,
                )
 
-       async def copy(self, src, dst):
+       async def exists(self, *args, **kwargs):
+               """
+                       Checks whether a file exists
+               """
+               return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+       async def makedirs(self, path, **kwargs):
+               """
+                       Creates a new directory
+               """
+               return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
+       async def copy(self, src, dst, mode=None):
                """
                        Copies a file from src to dst
                """
                log.debug("Copying %s to %s" % (src, dst))
 
-               path = os.path.dirname(dst)
-
-               # Create destination path (if it does not exist)
-               try:
-                       await asyncio.to_thread(os.makedirs, path)
-               except FileExistsError:
-                       pass
+               # Create parent directory
+               await self.make_parent_directory(dst)
 
                # Copy data without any metadata
                await asyncio.to_thread(shutil.copyfile, src, dst)
 
+               # Set mode
+               if mode:
+                       await asyncio.to_thread(os.chmod, dst, mode)
+
+       async def move(self, src, dst, **kwargs):
+               """
+                       Moves something from src to dst
+               """
+               log.debug("Moving %s to %s" % (src, dst))
+
+               # Create parent directory
+               await self.make_parent_directory(dst)
+
+               # Move!
+               await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
+       async def make_parent_directory(self, path):
+               """
+                       Creates the parent directory of path
+               """
+               path = os.path.dirname(path)
+
+               return await self.makedirs(path, exist_ok=True)
+
        async def unlink(self, path):
                """
                        Unlinks path
                """
+               # Normalize the path
+               path = os.path.abspath(path)
+
+               # Check if the path is within our base directory
+               if not path.startswith(self.basepath):
+                       raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
                log.debug("Unlinking %s" % path)
 
+               await asyncio.to_thread(self._unlink, path)
+
+       def _unlink(self, path):
+               # Unlink the file we were asked to unlink
                try:
-                       await asyncio.to_thread(os.unlink, path)
+                       os.unlink(path)
                except OSError as e:
+                       return
+
+               # Try to delete any empty parent directories
+               while True:
+                       # Get the parent directory
+                       path = os.path.dirname(path)
+
+                       # Break if we reached the base path
+                       if path == self.basepath or path == self.path("tmp"):
+                               break
+
+                       # Call rmdir()
+                       try:
+                               os.rmdir(path)
+                       except OSError as e:
+                               break
+
+                       log.debug("  Cleaned up %s..." % path)
+
+       async def rmtree(self, path):
+               """
+                       Removes a directory recursively
+               """
+               # Normalize the path
+               path = os.path.abspath(path)
+
+               # Check if the path is within our base directory
+               if not path.startswith(self.basepath):
+                       raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+               log.debug("Removing %s..." % path)
+
+               try:
+                       await asyncio.to_thread(shutil.rmtree, path)
+
+               # Ignore if path didn't exist in the first place
+               except FileNotFoundError:
                        pass
 
+       async def chmod(self, *args, **kwargs):
+               return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
+       def tempfile(self, mode="w+b", delete=True):
+               """
+                       Returns an open file handle to a new temporary file
+               """
+               path = self.path("tmp")
+
+               return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+       def tempdir(self, **kwargs):
+               """
+                       Asynchronously creates a new temporary directory
+               """
+               # Default to our own tmp directory
+               path = self.path("tmp")
+
+               return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
+
        def _write_tempfile(self, content):
                """
                        Writes the content to a temporary file and returns its path
                """
-               t = tempfile.NamedTemporaryFile(delete=False)
+               t = self.tempfile(delete=False)
 
                # Write the content
                if content:
@@ -277,19 +492,54 @@ class Backend(object):
                """
                        Opens a package and returns the archive
                """
-               return await asyncio.to_thread(self._open, path)
-
-       def _open(self, path):
-               # Create a dummy Pakfire instance
-               p = pakfire.Pakfire(offline=True)
+               log.debug("Opening %s..." % path)
 
                # Open the archive
-               return p.open(path)
+               async with self.pakfire() as p:
+                       return await asyncio.to_thread(p.open, path)
+
+       @property
+       def ssl_context(self):
+               # Create SSL context
+               context = ssl.create_default_context()
+
+               # Fetch client certificate
+               certificate = self.settings.get("client-certificate", None)
+               key         = self.settings.get("client-key", None)
+
+               # Apply client certificate
+               if certificate and key:
+                       with tempfile.NamedTemporaryFile(mode="w") as f_cert:
+                               f_cert.write(certificate)
+                               f_cert.flush()
+
+                               with tempfile.NamedTemporaryFile(mode="w") as f_key:
+                                       f_key.write(key)
+                                       f_key.flush()
+
+                                       context.load_cert_chain(f_cert.name, f_key.name)
+
+               return context
+
+       async def load_certificate(self, certfile, keyfile):
+               with self.db.transaction():
+                       # Load certificate
+                       with open(certfile) as f:
+                               self.settings.set("client-certificate", f.read())
+
+                       # Load key file
+                       with open(keyfile) as f:
+                               self.settings.set("client-key", f.read())
+
+                       log.info("Updated certificates")
 
        async def cleanup(self):
                """
                        Called regularly to cleanup any left-over resources
                """
+               # Messages
+               await self.messages.queue.cleanup()
+
                # Sessions
                await self.sessions.cleanup()
 
@@ -308,11 +558,14 @@ class Backend(object):
                        log.warning("No sync target configured")
                        return 0
 
+               # Update the timestamp
+               await self._update_timestamp()
+
                commandline = [
                        "rsync",
 
                        # Show what is being transferred
-                       #"--verbose",
+                       "--verbose",
 
                        # Compress any transferred data
                        "--compress",
@@ -338,6 +591,9 @@ class Backend(object):
                        # Add source & target
                        "%s/" % self.basepath,
                        target,
+
+                       # Sync the .timestamp
+                       "--include=.timestamp",
                ]
 
                # Add all mirrored repositories
@@ -352,17 +608,24 @@ class Backend(object):
                # Run command
                await self.command(*commandline, krb5_auth=True)
 
+       async def _update_timestamp(self):
+               """
+                       Updates the .timestamp file in the root of the exported files
+               """
+               # Make filename
+               path = os.path.join(self.basepath, ".timestamp")
+
+               t = datetime.datetime.utcnow()
+
+               # Write the current time as seconds since epoch
+               async with aiofiles.open(path, mode="w") as f:
+                       await f.write(t.strftime("%s"))
+
 
 def setup_logging():
        """
                Configures the logger for the buildservice backend
        """
-       # Do not propagate anything from the build service up to any Pakfire instances
-       log.propagate = 0
-
-       # Enable debug logging
-       log.setLevel(logging.DEBUG)
-
        # Log everything to journal
        handler = systemd.journal.JournalHandler(
                SYSLOG_IDENTIFIER="pakfire-build-service",