]> git.ipfire.org Git - pbs.git/blobdiff - src/buildservice/__init__.py
backend: Move cleanup job into the main thread
[pbs.git] / src / buildservice / __init__.py
index 496f1b0b1e4ff1907b24a332fb2e4e91da2404ed..862ed46b42aa615af273779870a011f73a24d523 100644 (file)
@@ -1,10 +1,16 @@
 #!/usr/bin/python
 
+import aiofiles
 import asyncio
 import configparser
+import datetime
+import inspect
 import logging
 import os
 import pakfire
+import shutil
+import ssl
+import systemd.journal
 import tempfile
 import urllib.parse
 
@@ -13,26 +19,28 @@ from . import bugtracker
 from . import builders
 from . import builds
 from . import cache
+from . import config
 from . import database
 from . import distribution
 from . import events
-from . import jobqueue
+from . import httpclient
 from . import jobs
 from . import keys
-from . import logs
+from . import logstreams
 from . import messages
 from . import mirrors
 from . import packages
+from . import ratelimiter
+from . import releasemonitoring
 from . import repository
 from . import settings
 from . import sessions
 from . import sources
-from . import updates
 from . import uploads
 from . import users
 
-log = logging.getLogger("backend")
-log.propagate = 1
+# Setup logging
+log = logging.getLogger("pbs")
 
 # Import version
 from .__version__ import VERSION as __version__
@@ -43,13 +51,24 @@ from .constants import *
 class Backend(object):
        version = __version__
 
-       def __init__(self, config_file=None):
-               # Read configuration file.
+       # A list of any background tasks
+       __tasks = set()
+
+       def __init__(self, config_file, test=False):
+               self.test = test
+
+               # Read configuration file
                self.config = self.read_config(config_file)
 
+               # Fetch the base path
+               self.basepath = self.config.get("global", "basepath")
+
                # Global pakfire settings (from database).
                self.settings = settings.Settings(self)
 
+               # Initialize the HTTP Client
+               self.httpclient = httpclient.HTTPClient(self)
+
                self.aws         = aws.AWS(self)
                self.builds      = builds.Builds(self)
                self.cache       = cache.Cache(self)
@@ -57,60 +76,53 @@ class Backend(object):
                self.builders    = builders.Builders(self)
                self.distros     = distribution.Distributions(self)
                self.events      = events.Events(self)
-               self.jobqueue    = jobqueue.JobQueue(self)
                self.keys        = keys.Keys(self)
+               self.logstreams  = logstreams.LogStreams(self)
                self.messages    = messages.Messages(self)
                self.mirrors     = mirrors.Mirrors(self)
                self.packages    = packages.Packages(self)
+               self.monitorings = releasemonitoring.Monitorings(self)
+               self.ratelimiter = ratelimiter.RateLimiter(self)
                self.repos       = repository.Repositories(self)
                self.sessions    = sessions.Sessions(self)
                self.sources     = sources.Sources(self)
-               self.updates     = updates.Updates(self)
                self.uploads     = uploads.Uploads(self)
                self.users       = users.Users(self)
 
                # Open a connection to bugzilla.
-               self.bugzilla    = bugtracker.Bugzilla(self)
+               self.bugzilla          = bugtracker.Bugzilla(self)
 
-       @lazy_property
-       def _environment_configuration(self):
-               env = {}
+               # Create a temporary directory
+               self._create_tmp_path()
 
-               # Get database configuration
-               env["database"] = {
-                       "name"     : os.environ.get("PBS_DATABASE_NAME"),
-                       "hostname" : os.environ.get("PBS_DATABASE_HOSTNAME"),
-                       "user"     : os.environ.get("PBS_DATABASE_USER"),
-                       "password" : os.environ.get("PBS_DATABASE_PASSWORD"),
-               }
+               log.info("Pakfire Build Service initialized at %s" % self.basepath)
 
-               return env
+       def launch_background_tasks(self):
+               # Launch some initial tasks
+               self.run_task(self.users.generate_vapid_keys)
+               self.run_task(self.builders.autoscale)
 
-       def read_config(self, path):
-               c = configparser.SafeConfigParser()
+               # Regularly sync data to the mirrors
+               self.run_periodic_task(300, self.sync)
 
-               # Import configuration from environment
-               for section in self._environment_configuration:
-                       c.add_section(section)
+               # Regularly check the mirrors
+               self.run_periodic_task(300, self.mirrors.check)
 
-                       for k in self._environment_configuration[section]:
-                               c.set(section, k, self._environment_configuration[section][k] or "")
+               # Regularly fetch sources
+               self.run_periodic_task(300, self.sources.fetch)
 
-               # Load default configuration file first
-               paths = [
-                       os.path.join(CONFIGSDIR, "pbs.conf"),
-               ]
+               # Regularly check for new releases
+               self.run_periodic_task(300, self.monitorings.check)
 
-               if path:
-                       paths.append(path)
+               # Cleanup regularly
+               self.run_periodic_task(3600, self.cleanup)
 
-               # Load all configuration files
-               for path in paths:
-                       if os.path.exists(path):
-                               log.debug("Loading configuration from %s" % path)
-                               c.read(path)
-                       else:
-                               log.error("No such file %s" % path)
+       def read_config(self, path):
+               c = configparser.ConfigParser()
+
+               # Read configuration from file
+               if path:
+                       c.read(path)
 
                return c
 
@@ -126,61 +138,153 @@ class Backend(object):
 
                log.debug("Connecting to database %s @ %s" % (name, hostname))
 
-               return database.Connection(hostname, name, user=user, password=password)
+               return database.Connection(self, hostname, name, user=user, password=password)
 
-       def path_to_url(self, path):
+       def _create_tmp_path(self):
                """
-                       Takes a path to a file on the file system and converts it into a URL
+                       This function will create some temporary space with the correct permissions.
+               """
+               path = self.path("tmp")
+
+               try:
+                       os.mkdir(path, mode=0o1777)
+
+               # Ignore if the directory already exists
+               except FileExistsError:
+                       pass
+
+       def path(self, *args):
+               """
+                       Takes a relative path and makes it absolute
+               """
+               return os.path.join(self.basepath, *args)
+
+       def url_to(self, url):
+               """
+                       Takes a relative URL and makes it absolute
                """
                # The base URL
                baseurl = self.settings.get("baseurl")
 
+               # Join it all together
+               return urllib.parse.urljoin(baseurl, url)
+
+       def path_to_url(self, path):
+               """
+                       Takes a path to a file on the file system and converts it into a URL
+               """
                # Path to package
                path = os.path.join(
-                       "files", os.path.relpath(path, PAKFIRE_DIR),
+                       "files", os.path.relpath(path, self.basepath),
                )
 
-               # Join it all together
-               return urllib.parse.urljoin(baseurl, path)
+               return self.url_to(path)
 
-       def pakfire(self, config, offline=True, **kwargs):
+       def pakfire(self, *args, **kwargs):
                """
                        Launches a new Pakfire instance with the given configuration
                """
-               log.debug("Launching pakfire with configuration:\n%s" % config)
+               return config.PakfireConfig(self, *args, **kwargs)
 
-               # Write configuration to file
-               t = self._write_tempfile(config)
+       # Functions to run something in background
 
-               # Launch a new Pakfire instance
-               try:
-                       return pakfire.Pakfire(conf=t, logger=log.log, offline=offline, **kwargs)
+       def run_task(self, callback, *args):
+               """
+                       Runs the given coroutine in the background
+               """
+               # Create a new task
+               task = asyncio.create_task(callback(*args))
+
+               # Keep a reference to the task and remove it when the task has finished
+               self.__tasks.add(task)
+               task.add_done_callback(self.__tasks.discard)
+
+               return task
+
+       def run_periodic_task(self, delay, callback, *args):
+               """
+                       Calls the given callback periodically in the background
+               """
+               self.run_task(self._periodic_task, delay, callback, *args)
+
+       async def _periodic_task(self, delay, callback, *args):
+               """
+                       Helper function for run_periodic_task() that will call the given
+                       callback regulary after the timer has expired.
+               """
+               log.debug("Periodic callback %r started" % callback)
+
+               while True:
+                       try:
+                               ret = callback(*args)
+
+                               # Await ret if callback is a coroutine
+                               if inspect.isawaitable(ret):
+                                       await ret
 
-               finally:
-                       # Delete the configuration file
-                       os.unlink(t)
+                       except Exception as e:
+                               log.error("Exception in periodic callback %r" % callback, exc_info=True)
+
+                       # Wait a little moment
+                       await asyncio.sleep(delay)
 
        # Commands
 
-       async def command(self, *args, krb5_auth=False, **kwargs):
+       async def command(self, *command, krb5_auth=False, **kwargs):
                """
                        Runs this shell command
                """
-               # Authenticate using Kerberos
-               if krb5_auth:
-                       await self.krb5_auth()
+               async with self.tempdir() as tmp:
+                       # Create a minimal environment
+                       env = {
+                               "HOME"       : os.environ.get("HOME", tmp),
+
+                               # Tell the system where to put temporary files
+                               "TMPDIR"     : tmp,
 
-               log.debug("Running command: %s" % " ".join(args))
+                               # Store any Kerberos credentials here
+                               "KRB5CCNAME" : os.path.join(tmp, ".krb5cc"),
+                       }
+
+                       # Authenticate using Kerberos
+                       if krb5_auth:
+                               await self._krb5_auth(env=env)
+
+                       # Run the command
+                       return await self._command(*command, env=env, **kwargs)
+
+       async def _command(self, *command, return_output=False, input=None, **kwargs):
+               log.debug("Running command: %s" % " ".join(command))
 
                # Fork child process
                process = await asyncio.create_subprocess_exec(
-                       *args,
-                       stdin=asyncio.subprocess.DEVNULL,
+                       *command,
+                       stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
                        stdout=asyncio.subprocess.PIPE,
-                       stderr=asyncio.subprocess.STDOUT,
+                       stderr=asyncio.subprocess.PIPE,
                        **kwargs,
                )
 
+               # Send input
+               if input:
+                       # Convert to bytes
+                       if not isinstance(input, bytes):
+                               input = input.encode()
+
+                       # Write the entire data chunk by chunk
+                       while input:
+                               chunk, input = input[0:64], input[64:]
+                               if not chunk:
+                                       break
+
+                               process.stdin.write(chunk)
+                               await process.stdin.drain()
+
+                       # Close the input once we are done
+                       process.stdin.close()
+
+               stdout = []
+
                # Fetch output of command and send it to the logger
                while True:
                        line = await process.stdout.readline()
@@ -193,12 +297,36 @@ class Backend(object):
                        # Strip newline
                        line = line.rstrip()
 
-                       log.info(line)
+                       # Log the output
+                       log.debug(line)
+
+                       # Store the output if requested
+                       if return_output:
+                               stdout.append(line)
 
                # Wait until the process has finished
-               await process.wait()
+               returncode = await process.wait()
+
+               # Check the return code
+               if returncode:
+                       # Fetch any output from the standard error output
+                       stderr = await process.stderr.read()
+                       stderr = stderr.decode()
+
+                       # Log the error
+                       log.error("Error running command: %s (code=%s)" % (
+                               " ".join(command), returncode,
+                       ))
+                       if stderr:
+                               log.error(stderr)
+
+                       raise CommandExecutionError(returncode, stderr)
+
+               # Return output if requested
+               if return_output:
+                       return "\n".join(stdout)
 
-       async def krb5_auth(self):
+       async def _krb5_auth(self, **kwargs):
                log.debug("Performing Kerberos authentication...")
 
                # Fetch path to keytab
@@ -214,46 +342,148 @@ class Backend(object):
                        return
 
                # Fetch a Kerberos ticket
-               await self.command(
-                       "kinit", "-k", "-t", keytab, principal,
+               await self._command(
+                       "kinit", "-k", "-t", keytab, principal, **kwargs,
                )
 
-       async def copy(self, src, dst):
+       async def exists(self, *args, **kwargs):
+               """
+                       Checks whether a file exists
+               """
+               return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+       async def makedirs(self, path, **kwargs):
+               """
+                       Creates a new directory
+               """
+               return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
+       async def copy(self, src, dst, mode=None):
                """
                        Copies a file from src to dst
                """
                log.debug("Copying %s to %s" % (src, dst))
 
-               path = os.path.dirname(dst)
-
-               # Create destination path (if it does not exist)
-               try:
-                       await asyncio.to_thread(os.makedirs, path)
-               except FileExistsError:
-                       pass
+               # Create parent directory
+               await self.make_parent_directory(dst)
 
                # Copy data without any metadata
                await asyncio.to_thread(shutil.copyfile, src, dst)
 
+               # Set mode
+               if mode:
+                       await asyncio.to_thread(os.chmod, dst, mode)
+
+       async def move(self, src, dst, **kwargs):
+               """
+                       Moves something from src to dst
+               """
+               log.debug("Moving %s to %s" % (src, dst))
+
+               # Create parent directory
+               await self.make_parent_directory(dst)
+
+               # Move!
+               await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
+       async def make_parent_directory(self, path):
+               """
+                       Creates the parent directory of path
+               """
+               path = os.path.dirname(path)
+
+               return await self.makedirs(path, exist_ok=True)
+
        async def unlink(self, path):
                """
                        Unlinks path
                """
+               # Normalize the path
+               path = os.path.abspath(path)
+
+               # Check if the path is within our base directory
+               if not path.startswith(self.basepath):
+                       raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
                log.debug("Unlinking %s" % path)
 
+               await asyncio.to_thread(self._unlink, path)
+
+       def _unlink(self, path):
+               # Unlink the file we were asked to unlink
                try:
-                       await asycio.to_thread(os.unlink, path)
+                       os.unlink(path)
                except OSError as e:
+                       return
+
+               # Try to delete any empty parent directories
+               while True:
+                       # Get the parent directory
+                       path = os.path.dirname(path)
+
+                       # Break if we reached the base path
+                       if path == self.basepath or path == self.path("tmp"):
+                               break
+
+                       # Call rmdir()
+                       try:
+                               os.rmdir(path)
+                       except OSError as e:
+                               break
+
+                       log.debug("  Cleaned up %s..." % path)
+
+       async def rmtree(self, path):
+               """
+                       Removes a directory recursively
+               """
+               # Normalize the path
+               path = os.path.abspath(path)
+
+               # Check if the path is within our base directory
+               if not path.startswith(self.basepath):
+                       raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+               log.debug("Removing %s..." % path)
+
+               try:
+                       await asyncio.to_thread(shutil.rmtree, path)
+
+               # Ignore if path didn't exist in the first place
+               except FileNotFoundError:
                        pass
 
+       async def chmod(self, *args, **kwargs):
+               return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
+       def tempfile(self, mode="w+b", delete=True):
+               """
+                       Returns an open file handle to a new temporary file
+               """
+               path = self.path("tmp")
+
+               return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+       def tempdir(self, **kwargs):
+               """
+                       Asynchronously creates a new temporary directory
+               """
+               # Default to our own tmp directory
+               path = self.path("tmp")
+
+               return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
+
        def _write_tempfile(self, content):
                """
                        Writes the content to a temporary file and returns its path
                """
-               t = tempfile.NamedTemporaryFile(delete=False)
+               t = self.tempfile(delete=False)
 
                # Write the content
-               t.write(content.encode())
+               if content:
+                       t.write(content.encode())
+
+               # Close the file
                t.close()
 
                return t.name
@@ -262,55 +492,146 @@ class Backend(object):
                """
                        Opens a package and returns the archive
                """
-               return await asyncio.to_thread(self._open, path)
-
-       def _open(self, path):
-               # Create a dummy Pakfire instance
-               p = pakfire.Pakfire(offline=True)
+               log.debug("Opening %s..." % path)
 
                # Open the archive
-               return p.open(path)
+               async with self.pakfire() as p:
+                       return await asyncio.to_thread(p.open, path)
 
-       def delete_file(self, path, not_before=None):
-               self.db.execute("INSERT INTO queue_delete(path, not_before) \
-                       VALUES(%s, %s)", path, not_before)
+       @property
+       def ssl_context(self):
+               # Create SSL context
+               context = ssl.create_default_context()
 
-       def cleanup_files(self):
-               query = self.db.query("SELECT * FROM queue_delete \
-                       WHERE (not_before IS NULL OR not_before <= NOW())")
+               # Fetch client certificate
+               certificate = self.settings.get("client-certificate", None)
+               key         = self.settings.get("client-key", None)
 
-               for row in query:
-                       if not row.path:
-                               continue
+               # Apply client certificate
+               if certificate and key:
+                       with tempfile.NamedTemporaryFile(mode="w") as f_cert:
+                               f_cert.write(certificate)
+                               f_cert.flush()
 
-                       path = row.path
+                               with tempfile.NamedTemporaryFile(mode="w") as f_key:
+                                       f_key.write(key)
+                                       f_key.flush()
 
-                       if not path or not path.startswith("%s/" % PAKFIRE_DIR):
-                               log.warning("Cannot delete file outside of the tree")
-                               continue
+                                       context.load_cert_chain(f_cert.name, f_key.name)
 
-                       try:
-                               logging.debug("Removing %s..." % path)
-                               os.unlink(path)
-                       except OSError as e:
-                               logging.error("Could not remove %s: %s" % (path, e))
+               return context
 
-                       while True:                     
-                               path = os.path.dirname(path)
+       async def load_certificate(self, certfile, keyfile):
+               with self.db.transaction():
+                       # Load certificate
+                       with open(certfile) as f:
+                               self.settings.set("client-certificate", f.read())
 
-                               # Stop if we are running outside of the tree.
-                               if not path.startswith(PAKFIRE_DIR):
-                                       break
+                       # Load key file
+                       with open(keyfile) as f:
+                               self.settings.set("client-key", f.read())
 
-                               # If the directory is not empty, we cannot remove it.
-                               if os.path.exists(path) and os.listdir(path):
-                                       break
+                       log.info("Updated certificates")
+
+       async def cleanup(self):
+               """
+                       Called regularly to cleanup any left-over resources
+               """
+               # Messages
+               await self.messages.queue.cleanup()
+
+               # Sessions
+               await self.sessions.cleanup()
+
+               # Uploads
+               await self.uploads.cleanup()
+
+       async def sync(self):
+               """
+                       Syncs any repository that should be mirrored
+               """
+               log.info("Syncing mirrors...")
+
+               # Fetch the sync target
+               target = self.settings.get("sync-target")
+               if not target:
+                       log.warning("No sync target configured")
+                       return 0
+
+               # Update the timestamp
+               await self._update_timestamp()
+
+               commandline = [
+                       "rsync",
+
+                       # Show what is being transferred
+                       "--verbose",
+
+                       # Compress any transferred data
+                       "--compress",
+
+                       # Enable archive mode
+                       "--archive",
+
+                       # Preserve hardlinks, ACLs & XATTRs
+                       "--hard-links",
+                       "--acls",
+                       "--xattrs",
+
+                       # Delete any files that we have deleted
+                       "--delete",
+                       "--delete-excluded",
+
+                       # Remove any empty directories
+                       "--prune-empty-dirs",
+
+                       # Make the transaction atomic
+                       "--delay-updates",
+
+                       # Add source & target
+                       "%s/" % self.basepath,
+                       target,
+
+                       # Sync the .timestamp
+                       "--include=.timestamp",
+               ]
+
+               # Add all mirrored repositories
+               for repo in self.repos.mirrored:
+                       path = os.path.relpath(repo.local_path(), self.basepath)
+
+                       commandline.append("--include=%s***" % path)
+
+               # Exclude everything that hasn't been included
+               commandline += ("--include=*/", "--exclude=*")
+
+               # Run command
+               await self.command(*commandline, krb5_auth=True)
+
+       async def _update_timestamp(self):
+               """
+                       Updates the .timestamp file in the root of the exported files
+               """
+               # Make filename
+               path = os.path.join(self.basepath, ".timestamp")
+
+               t = datetime.datetime.utcnow()
+
+               # Write the current time as seconds since epoch
+               async with aiofiles.open(path, mode="w") as f:
+                       await f.write(t.strftime("%s"))
+
+
+def setup_logging():
+       """
+               Configures the logger for the buildservice backend
+       """
+       # Log everything to journal
+       handler = systemd.journal.JournalHandler(
+               SYSLOG_IDENTIFIER="pakfire-build-service",
+       )
+       log.addHandler(handler)
 
-                               try:
-                                       logging.debug("Removing %s..." % path)
-                                       os.rmdir(path)
-                               except OSError as e:
-                                       logging.error("Could not remove %s: %s" % (path, e))
-                                       break
 
-                       self.db.execute("DELETE FROM queue_delete WHERE id = %s", row.id)
+# Setup logging
+setup_logging()