#!/usr/bin/python
+import aiofiles
import asyncio
import configparser
+import datetime
+import inspect
import logging
import os
import pakfire
+import shutil
+import ssl
+import systemd.journal
import tempfile
+import urllib.parse
from . import aws
from . import bugtracker
from . import builders
from . import builds
from . import cache
+from . import config
from . import database
from . import distribution
from . import events
-from . import jobqueue
+from . import httpclient
from . import jobs
from . import keys
-from . import logs
+from . import logstreams
from . import messages
from . import mirrors
from . import packages
+from . import ratelimiter
+from . import releasemonitoring
from . import repository
from . import settings
from . import sessions
from . import sources
-from . import updates
from . import uploads
from . import users
-log = logging.getLogger("backend")
-log.propagate = 1
+# Setup logging
+log = logging.getLogger("pbs")
# Import version
from .__version__ import VERSION as __version__
class Backend(object):
version = __version__
- def __init__(self, config_file=None):
- # Read configuration file.
+ # A list of any background tasks
+ __tasks = set()
+
+ def __init__(self, config_file, test=False):
+ self.test = test
+
+ # Read configuration file
self.config = self.read_config(config_file)
+ # Fetch the base path
+ self.basepath = self.config.get("global", "basepath")
+
# Global pakfire settings (from database).
self.settings = settings.Settings(self)
+ # Initialize the HTTP Client
+ self.httpclient = httpclient.HTTPClient(self)
+
self.aws = aws.AWS(self)
self.builds = builds.Builds(self)
self.cache = cache.Cache(self)
self.builders = builders.Builders(self)
self.distros = distribution.Distributions(self)
self.events = events.Events(self)
- self.jobqueue = jobqueue.JobQueue(self)
self.keys = keys.Keys(self)
+ self.logstreams = logstreams.LogStreams(self)
self.messages = messages.Messages(self)
self.mirrors = mirrors.Mirrors(self)
self.packages = packages.Packages(self)
+ self.monitorings = releasemonitoring.Monitorings(self)
+ self.ratelimiter = ratelimiter.RateLimiter(self)
self.repos = repository.Repositories(self)
self.sessions = sessions.Sessions(self)
self.sources = sources.Sources(self)
- self.updates = updates.Updates(self)
self.uploads = uploads.Uploads(self)
self.users = users.Users(self)
# Open a connection to bugzilla.
- self.bugzilla = bugtracker.Bugzilla(self)
+ self.bugzilla = bugtracker.Bugzilla(self)
- @lazy_property
- def _environment_configuration(self):
- env = {}
+ # Create a temporary directory
+ self._create_tmp_path()
- # Get database configuration
- env["database"] = {
- "name" : os.environ.get("PBS_DATABASE_NAME"),
- "hostname" : os.environ.get("PBS_DATABASE_HOSTNAME"),
- "user" : os.environ.get("PBS_DATABASE_USER"),
- "password" : os.environ.get("PBS_DATABASE_PASSWORD"),
- }
+ log.info("Pakfire Build Service initialized at %s" % self.basepath)
- return env
+ def launch_background_tasks(self):
+ # Launch some initial tasks
+ self.run_task(self.users.generate_vapid_keys)
+ self.run_task(self.builders.autoscale)
- def read_config(self, path):
- c = configparser.SafeConfigParser()
+ # Regularly sync data to the mirrors
+ self.run_periodic_task(300, self.sync)
- # Import configuration from environment
- for section in self._environment_configuration:
- c.add_section(section)
+ # Regularly check the mirrors
+ self.run_periodic_task(300, self.mirrors.check)
- for k in self._environment_configuration[section]:
- c.set(section, k, self._environment_configuration[section][k] or "")
+ # Regularly fetch sources
+ self.run_periodic_task(300, self.sources.fetch)
- # Load default configuration file first
- paths = [
- os.path.join(CONFIGSDIR, "pbs.conf"),
- ]
+ # Regularly check for new releases
+ self.run_periodic_task(300, self.monitorings.check)
- if path:
- paths.append(path)
+ # Cleanup regularly
+ self.run_periodic_task(3600, self.cleanup)
+
+ def read_config(self, path):
+ c = configparser.ConfigParser()
- # Load all configuration files
- for path in paths:
- if os.path.exists(path):
- log.debug("Loading configuration from %s" % path)
- c.read(path)
- else:
- log.error("No such file %s" % path)
+ # Read configuration from file
+ if path:
+ c.read(path)
return c
log.debug("Connecting to database %s @ %s" % (name, hostname))
- return database.Connection(hostname, name, user=user, password=password)
+ return database.Connection(self, hostname, name, user=user, password=password)
+
+ def _create_tmp_path(self):
+ """
+ This function will create some temporary space with the correct permissions.
+ """
+ path = self.path("tmp")
+
+ try:
+ os.mkdir(path, mode=0o1777)
- def pakfire(self, config, offline=True, **kwargs):
+ # Ignore if the directory already exists
+ except FileExistsError:
+ pass
+
+ def path(self, *args):
+ """
+ Takes a relative path and makes it absolute
+ """
+ return os.path.join(self.basepath, *args)
+
+ def url_to(self, url):
+ """
+ Takes a relative URL and makes it absolute
+ """
+ # The base URL
+ baseurl = self.settings.get("baseurl")
+
+ # Join it all together
+ return urllib.parse.urljoin(baseurl, url)
+
+ def path_to_url(self, path):
+ """
+ Takes a path to a file on the file system and converts it into a URL
+ """
+ # Path to package
+ path = os.path.join(
+ "files", os.path.relpath(path, self.basepath),
+ )
+
+ return self.url_to(path)
+
+ def pakfire(self, *args, **kwargs):
"""
Launches a new Pakfire instance with the given configuration
"""
- log.debug("Launching pakfire with configuration:\n%s" % config)
+ return config.PakfireConfig(self, *args, **kwargs)
- # Write configuration to file
- t = self._write_tempfile(config)
+ # Functions to run something in background
- # Launch a new Pakfire instance
- try:
- return pakfire.Pakfire(conf=t, logger=log.log, offline=offline, **kwargs)
+ def run_task(self, callback, *args):
+ """
+ Runs the given coroutine in the background
+ """
+ # Create a new task
+ task = asyncio.create_task(callback(*args))
+
+ # Keep a reference to the task and remove it when the task has finished
+ self.__tasks.add(task)
+ task.add_done_callback(self.__tasks.discard)
+
+ return task
+
+ def run_periodic_task(self, delay, callback, *args):
+ """
+ Calls the given callback periodically in the background
+ """
+ self.run_task(self._periodic_task, delay, callback, *args)
- finally:
- # Delete the configuration file
- os.unlink(t)
+ async def _periodic_task(self, delay, callback, *args):
+ """
+ Helper function for run_periodic_task() that will call the given
+ callback regulary after the timer has expired.
+ """
+ log.debug("Periodic callback %r started" % callback)
+
+ while True:
+ try:
+ ret = callback(*args)
+
+ # Await ret if callback is a coroutine
+ if inspect.isawaitable(ret):
+ await ret
+
+ except Exception as e:
+ log.error("Exception in periodic callback %r" % callback, exc_info=True)
+
+ # Wait a little moment
+ await asyncio.sleep(delay)
# Commands
- async def command(self, *args, krb5_auth=False, **kwargs):
+ async def command(self, *command, krb5_auth=False, **kwargs):
"""
Runs this shell command
"""
- # Authenticate using Kerberos
- if krb5_auth:
- await self.krb5_auth()
+ async with self.tempdir() as tmp:
+ # Create a minimal environment
+ env = {
+ "HOME" : os.environ.get("HOME", tmp),
+
+ # Tell the system where to put temporary files
+ "TMPDIR" : tmp,
+
+ # Store any Kerberos credentials here
+ "KRB5CCNAME" : os.path.join(tmp, ".krb5cc"),
+ }
- log.debug("Running command: %s" % " ".join(args))
+ # Authenticate using Kerberos
+ if krb5_auth:
+ await self._krb5_auth(env=env)
+
+ # Run the command
+ return await self._command(*command, env=env, **kwargs)
+
+ async def _command(self, *command, return_output=False, input=None, **kwargs):
+ log.debug("Running command: %s" % " ".join(command))
# Fork child process
process = await asyncio.create_subprocess_exec(
- *args,
- stdin=asyncio.subprocess.DEVNULL,
+ *command,
+ stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
+ stderr=asyncio.subprocess.PIPE,
**kwargs,
)
+ # Send input
+ if input:
+ # Convert to bytes
+ if not isinstance(input, bytes):
+ input = input.encode()
+
+ # Write the entire data chunk by chunk
+ while input:
+ chunk, input = input[0:64], input[64:]
+ if not chunk:
+ break
+
+ process.stdin.write(chunk)
+ await process.stdin.drain()
+
+ # Close the input once we are done
+ process.stdin.close()
+
+ stdout = []
+
# Fetch output of command and send it to the logger
while True:
line = await process.stdout.readline()
# Strip newline
line = line.rstrip()
- log.info(line)
+ # Log the output
+ log.debug(line)
+
+ # Store the output if requested
+ if return_output:
+ stdout.append(line)
# Wait until the process has finished
- await process.wait()
+ returncode = await process.wait()
+
+ # Check the return code
+ if returncode:
+ # Fetch any output from the standard error output
+ stderr = await process.stderr.read()
+ stderr = stderr.decode()
- async def krb5_auth(self):
+ # Log the error
+ log.error("Error running command: %s (code=%s)" % (
+ " ".join(command), returncode,
+ ))
+ if stderr:
+ log.error(stderr)
+
+ raise CommandExecutionError(returncode, stderr)
+
+ # Return output if requested
+ if return_output:
+ return "\n".join(stdout)
+
+ async def _krb5_auth(self, **kwargs):
log.debug("Performing Kerberos authentication...")
# Fetch path to keytab
return
# Fetch a Kerberos ticket
- await self.command(
- "kinit", "-k", "-t", keytab, principal,
+ await self._command(
+ "kinit", "-k", "-t", keytab, principal, **kwargs,
)
- async def copy(self, src, dst):
+ async def exists(self, *args, **kwargs):
+ """
+ Checks whether a file exists
+ """
+ return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+ async def makedirs(self, path, **kwargs):
+ """
+ Creates a new directory
+ """
+ return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
+ async def copy(self, src, dst, mode=None):
"""
Copies a file from src to dst
"""
log.debug("Copying %s to %s" % (src, dst))
- path = os.path.dirname(dst)
+ # Create parent directory
+ await self.make_parent_directory(dst)
- # Create destination path (if it does not exist)
+ # Copy data without any metadata
+ await asyncio.to_thread(shutil.copyfile, src, dst)
+
+ # Set mode
+ if mode:
+ await asyncio.to_thread(os.chmod, dst, mode)
+
+ async def move(self, src, dst, **kwargs):
+ """
+ Moves something from src to dst
+ """
+ log.debug("Moving %s to %s" % (src, dst))
+
+ # Create parent directory
+ await self.make_parent_directory(dst)
+
+ # Move!
+ await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
+ async def make_parent_directory(self, path):
+ """
+ Creates the parent directory of path
+ """
+ path = os.path.dirname(path)
+
+ return await self.makedirs(path, exist_ok=True)
+
+ async def unlink(self, path):
+ """
+ Unlinks path
+ """
+ # Normalize the path
+ path = os.path.abspath(path)
+
+ # Check if the path is within our base directory
+ if not path.startswith(self.basepath):
+ raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+ log.debug("Unlinking %s" % path)
+
+ await asyncio.to_thread(self._unlink, path)
+
+ def _unlink(self, path):
+ # Unlink the file we were asked to unlink
try:
- await asyncio.to_thread(os.makedirs, path)
- except FileExistsError:
+ os.unlink(path)
+ except OSError as e:
+ return
+
+ # Try to delete any empty parent directories
+ while True:
+ # Get the parent directory
+ path = os.path.dirname(path)
+
+ # Break if we reached the base path
+ if path == self.basepath or path == self.path("tmp"):
+ break
+
+ # Call rmdir()
+ try:
+ os.rmdir(path)
+ except OSError as e:
+ break
+
+ log.debug(" Cleaned up %s..." % path)
+
+ async def rmtree(self, path):
+ """
+ Removes a directory recursively
+ """
+ # Normalize the path
+ path = os.path.abspath(path)
+
+ # Check if the path is within our base directory
+ if not path.startswith(self.basepath):
+ raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+ log.debug("Removing %s..." % path)
+
+ try:
+ await asyncio.to_thread(shutil.rmtree, path)
+
+ # Ignore if path didn't exist in the first place
+ except FileNotFoundError:
pass
- # Copy data without any metadata
- await asyncio.to_thread(shutil.copyfile, src, dst)
+ async def chmod(self, *args, **kwargs):
+ return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
+ def tempfile(self, mode="w+b", delete=True):
+ """
+ Returns an open file handle to a new temporary file
+ """
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+ def tempdir(self, **kwargs):
+ """
+ Asynchronously creates a new temporary directory
+ """
+ # Default to our own tmp directory
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
def _write_tempfile(self, content):
"""
Writes the content to a temporary file and returns its path
"""
- t = tempfile.NamedTemporaryFile(delete=False)
+ t = self.tempfile(delete=False)
# Write the content
- t.write(content.encode())
+ if content:
+ t.write(content.encode())
+
+ # Close the file
t.close()
return t.name
"""
Opens a package and returns the archive
"""
- return await asyncio.to_thread(self._open, path)
-
- def _open(self, path):
- # Create a dummy Pakfire instance
- p = pakfire.Pakfire(offline=True)
+ log.debug("Opening %s..." % path)
# Open the archive
- return p.open(path)
+ async with self.pakfire() as p:
+ return await asyncio.to_thread(p.open, path)
- def delete_file(self, path, not_before=None):
- self.db.execute("INSERT INTO queue_delete(path, not_before) \
- VALUES(%s, %s)", path, not_before)
+ @property
+ def ssl_context(self):
+ # Create SSL context
+ context = ssl.create_default_context()
- def cleanup_files(self):
- query = self.db.query("SELECT * FROM queue_delete \
- WHERE (not_before IS NULL OR not_before <= NOW())")
+ # Fetch client certificate
+ certificate = self.settings.get("client-certificate", None)
+ key = self.settings.get("client-key", None)
- for row in query:
- if not row.path:
- continue
+ # Apply client certificate
+ if certificate and key:
+ with tempfile.NamedTemporaryFile(mode="w") as f_cert:
+ f_cert.write(certificate)
+ f_cert.flush()
- path = row.path
+ with tempfile.NamedTemporaryFile(mode="w") as f_key:
+ f_key.write(key)
+ f_key.flush()
- if not path or not path.startswith("%s/" % PAKFIRE_DIR):
- log.warning("Cannot delete file outside of the tree")
- continue
+ context.load_cert_chain(f_cert.name, f_key.name)
- try:
- logging.debug("Removing %s..." % path)
- os.unlink(path)
- except OSError as e:
- logging.error("Could not remove %s: %s" % (path, e))
+ return context
- while True:
- path = os.path.dirname(path)
+ async def load_certificate(self, certfile, keyfile):
+ with self.db.transaction():
+ # Load certificate
+ with open(certfile) as f:
+ self.settings.set("client-certificate", f.read())
- # Stop if we are running outside of the tree.
- if not path.startswith(PAKFIRE_DIR):
- break
+ # Load key file
+ with open(keyfile) as f:
+ self.settings.set("client-key", f.read())
- # If the directory is not empty, we cannot remove it.
- if os.path.exists(path) and os.listdir(path):
- break
+ log.info("Updated certificates")
+
+ async def cleanup(self):
+ """
+ Called regularly to cleanup any left-over resources
+ """
+ # Messages
+ await self.messages.queue.cleanup()
+
+ # Sessions
+ await self.sessions.cleanup()
+
+ # Uploads
+ await self.uploads.cleanup()
+
+ async def sync(self):
+ """
+ Syncs any repository that should be mirrored
+ """
+ log.info("Syncing mirrors...")
+
+ # Fetch the sync target
+ target = self.settings.get("sync-target")
+ if not target:
+ log.warning("No sync target configured")
+ return 0
+
+ # Update the timestamp
+ await self._update_timestamp()
+
+ commandline = [
+ "rsync",
+
+ # Show what is being transferred
+ "--verbose",
+
+ # Compress any transferred data
+ "--compress",
+
+ # Enable archive mode
+ "--archive",
+
+ # Preserve hardlinks, ACLs & XATTRs
+ "--hard-links",
+ "--acls",
+ "--xattrs",
+
+ # Delete any files that we have deleted
+ "--delete",
+ "--delete-excluded",
+
+ # Remove any empty directories
+ "--prune-empty-dirs",
+
+ # Make the transaction atomic
+ "--delay-updates",
+
+ # Add source & target
+ "%s/" % self.basepath,
+ target,
+
+ # Sync the .timestamp
+ "--include=.timestamp",
+ ]
+
+ # Add all mirrored repositories
+ for repo in self.repos.mirrored:
+ path = os.path.relpath(repo.local_path(), self.basepath)
+
+ commandline.append("--include=%s***" % path)
+
+ # Exclude everything that hasn't been included
+ commandline += ("--include=*/", "--exclude=*")
+
+ # Run command
+ await self.command(*commandline, krb5_auth=True)
+
+ async def _update_timestamp(self):
+ """
+ Updates the .timestamp file in the root of the exported files
+ """
+ # Make filename
+ path = os.path.join(self.basepath, ".timestamp")
+
+ t = datetime.datetime.utcnow()
+
+ # Write the current time as seconds since epoch
+ async with aiofiles.open(path, mode="w") as f:
+ await f.write(t.strftime("%s"))
+
+
+def setup_logging():
+ """
+ Configures the logger for the buildservice backend
+ """
+ # Log everything to journal
+ handler = systemd.journal.JournalHandler(
+ SYSLOG_IDENTIFIER="pakfire-build-service",
+ )
+ log.addHandler(handler)
- try:
- logging.debug("Removing %s..." % path)
- os.rmdir(path)
- except OSError as e:
- logging.error("Could not remove %s: %s" % (path, e))
- break
- self.db.execute("DELETE FROM queue_delete WHERE id = %s", row.id)
+# Setup logging
+setup_logging()