#!/usr/bin/python
-from __future__ import absolute_import
-
-import ConfigParser
+import aiofiles
+import asyncio
+import configparser
+import datetime
+import inspect
import logging
import os
import pakfire
+import shutil
+import ssl
+import systemd.journal
+import tempfile
+import urllib.parse
-from . import arches
+from . import aws
from . import bugtracker
from . import builders
from . import builds
from . import cache
+from . import config
from . import database
from . import distribution
-from . import geoip
-from . import jobqueue
+from . import events
+from . import httpclient
+from . import jobs
from . import keys
-from . import logs
+from . import logstreams
from . import messages
from . import mirrors
from . import packages
+from . import ratelimiter
+from . import releasemonitoring
from . import repository
from . import settings
from . import sessions
from . import sources
-from . import updates
from . import uploads
from . import users
-log = logging.getLogger("backend")
-log.propagate = 1
+# Setup logging
+log = logging.getLogger("pbs")
# Import version
from .__version__ import VERSION as __version__
from .constants import *
class Backend(object):
- def __init__(self, config_file=None):
- # Read configuration file.
+ version = __version__
+
+ # A list of any background tasks
+ __tasks = set()
+
+ def __init__(self, config_file, test=False):
+ self.test = test
+
+ # Read configuration file
self.config = self.read_config(config_file)
+ # Fetch the base path
+ self.basepath = self.config.get("global", "basepath")
+
# Global pakfire settings (from database).
self.settings = settings.Settings(self)
- self.arches = arches.Arches(self)
+ # Initialize the HTTP Client
+ self.httpclient = httpclient.HTTPClient(self)
+
+ self.aws = aws.AWS(self)
self.builds = builds.Builds(self)
self.cache = cache.Cache(self)
- self.geoip = geoip.GeoIP(self)
- self.jobs = builds.Jobs(self)
+ self.jobs = jobs.Jobs(self)
self.builders = builders.Builders(self)
self.distros = distribution.Distributions(self)
- self.jobqueue = jobqueue.JobQueue(self)
+ self.events = events.Events(self)
self.keys = keys.Keys(self)
+ self.logstreams = logstreams.LogStreams(self)
self.messages = messages.Messages(self)
self.mirrors = mirrors.Mirrors(self)
self.packages = packages.Packages(self)
+ self.monitorings = releasemonitoring.Monitorings(self)
+ self.ratelimiter = ratelimiter.RateLimiter(self)
self.repos = repository.Repositories(self)
self.sessions = sessions.Sessions(self)
self.sources = sources.Sources(self)
- self.updates = updates.Updates(self)
self.uploads = uploads.Uploads(self)
self.users = users.Users(self)
# Open a connection to bugzilla.
- self.bugzilla = bugtracker.Bugzilla(self)
+ self.bugzilla = bugtracker.Bugzilla(self)
- # A pool to store strings (for comparison).
- self.pool = pakfire.satsolver.Pool("dummy")
+ # Create a temporary directory
+ self._create_tmp_path()
- def read_config(self, path):
- c = ConfigParser.SafeConfigParser()
+ log.info("Pakfire Build Service initialized at %s" % self.basepath)
- # Load default configuration file first
- paths = [
- os.path.join(CONFIGSDIR, "pbs.conf"),
- ]
+ def launch_background_tasks(self):
+ # Launch some initial tasks
+ self.run_task(self.users.generate_vapid_keys)
+ self.run_task(self.builders.autoscale)
- if path:
- paths.append(path)
+ # Regularly sync data to the mirrors
+ self.run_periodic_task(300, self.sync)
- # Load all configuration files
- for path in paths:
- if os.path.exists(path):
- log.debug("Loading configuration from %s" % path)
- c.read(path)
- else:
- log.error("No such file %s" % path)
+ # Regularly check the mirrors
+ self.run_periodic_task(300, self.mirrors.check)
+
+ # Regularly fetch sources
+ self.run_periodic_task(300, self.sources.fetch)
+
+ # Regularly check for new releases
+ self.run_periodic_task(300, self.monitorings.check)
+
+ # Cleanup regularly
+ self.run_periodic_task(3600, self.cleanup)
+
+ def read_config(self, path):
+ c = configparser.ConfigParser()
+
+ # Read configuration from file
+ if path:
+ c.read(path)
return c
@lazy_property
def db(self):
- name = self.config.get("database", "name")
- hostname = self.config.get("database", "hostname")
- user = self.config.get("database", "user")
- password = self.config.get("database", "password")
+ try:
+ name = self.config.get("database", "name")
+ hostname = self.config.get("database", "hostname")
+ user = self.config.get("database", "user")
+ password = self.config.get("database", "password")
+ except configparser.Error as e:
+ log.error("Error parsing the config: %s" % e.message)
log.debug("Connecting to database %s @ %s" % (name, hostname))
- return database.Connection(hostname, name, user=user, password=password)
+ return database.Connection(self, hostname, name, user=user, password=password)
+
+ def _create_tmp_path(self):
+ """
+ This function will create some temporary space with the correct permissions.
+ """
+ path = self.path("tmp")
- def cleanup_files(self):
- query = self.db.query("SELECT * FROM queue_delete")
+ try:
+ os.mkdir(path, mode=0o1777)
- for row in query:
- if not row.path:
- continue
+ # Ignore if the directory already exists
+ except FileExistsError:
+ pass
- path = os.path.join(PACKAGES_DIR, row.path)
+ def path(self, *args):
+ """
+ Takes a relative path and makes it absolute
+ """
+ return os.path.join(self.basepath, *args)
+ def url_to(self, url):
+ """
+ Takes a relative URL and makes it absolute
+ """
+ # The base URL
+ baseurl = self.settings.get("baseurl")
+
+ # Join it all together
+ return urllib.parse.urljoin(baseurl, url)
+
+ def path_to_url(self, path):
+ """
+ Takes a path to a file on the file system and converts it into a URL
+ """
+ # Path to package
+ path = os.path.join(
+ "files", os.path.relpath(path, self.basepath),
+ )
+
+ return self.url_to(path)
+
+ def pakfire(self, *args, **kwargs):
+ """
+ Launches a new Pakfire instance with the given configuration
+ """
+ return config.PakfireConfig(self, *args, **kwargs)
+
+ # Functions to run something in background
+
+ def run_task(self, callback, *args):
+ """
+ Runs the given coroutine in the background
+ """
+ # Create a new task
+ task = asyncio.create_task(callback(*args))
+
+ # Keep a reference to the task and remove it when the task has finished
+ self.__tasks.add(task)
+ task.add_done_callback(self.__tasks.discard)
+
+ return task
+
+ def run_periodic_task(self, delay, callback, *args):
+ """
+ Calls the given callback periodically in the background
+ """
+ self.run_task(self._periodic_task, delay, callback, *args)
+
+ async def _periodic_task(self, delay, callback, *args):
+ """
+ Helper function for run_periodic_task() that will call the given
+ callback regulary after the timer has expired.
+ """
+ log.debug("Periodic callback %r started" % callback)
+
+ while True:
try:
- logging.debug("Removing %s..." % path)
- os.unlink(path)
- except OSError, e:
- logging.error("Could not remove %s: %s" % (path, e))
+ ret = callback(*args)
- while True:
- path = os.path.dirname(path)
+ # Await ret if callback is a coroutine
+ if inspect.isawaitable(ret):
+ await ret
- # Stop if we are running outside of the tree.
- if not path.startswith(PACKAGES_DIR):
- break
+ except Exception as e:
+ log.error("Exception in periodic callback %r" % callback, exc_info=True)
- # If the directory is not empty, we cannot remove it.
- if os.path.exists(path) and os.listdir(path):
- break
+ # Wait a little moment
+ await asyncio.sleep(delay)
+
+ # Commands
+
+ async def command(self, *command, krb5_auth=False, **kwargs):
+ """
+ Runs this shell command
+ """
+ async with self.tempdir() as tmp:
+ # Create a minimal environment
+ env = {
+ "HOME" : os.environ.get("HOME", tmp),
+
+ # Tell the system where to put temporary files
+ "TMPDIR" : tmp,
- try:
- logging.debug("Removing %s..." % path)
- os.rmdir(path)
- except OSError, e:
- logging.error("Could not remove %s: %s" % (path, e))
+ # Store any Kerberos credentials here
+ "KRB5CCNAME" : os.path.join(tmp, ".krb5cc"),
+ }
+
+ # Authenticate using Kerberos
+ if krb5_auth:
+ await self._krb5_auth(env=env)
+
+ # Run the command
+ return await self._command(*command, env=env, **kwargs)
+
+ async def _command(self, *command, return_output=False, input=None, **kwargs):
+ log.debug("Running command: %s" % " ".join(command))
+
+ # Fork child process
+ process = await asyncio.create_subprocess_exec(
+ *command,
+ stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ **kwargs,
+ )
+
+ # Send input
+ if input:
+ # Convert to bytes
+ if not isinstance(input, bytes):
+ input = input.encode()
+
+ # Write the entire data chunk by chunk
+ while input:
+ chunk, input = input[0:64], input[64:]
+ if not chunk:
break
- self.db.execute("DELETE FROM queue_delete WHERE id = %s", row.id)
+ process.stdin.write(chunk)
+ await process.stdin.drain()
+
+ # Close the input once we are done
+ process.stdin.close()
+
+ stdout = []
+
+ # Fetch output of command and send it to the logger
+ while True:
+ line = await process.stdout.readline()
+ if not line:
+ break
+
+ # Decode line
+ line = line.decode()
+
+ # Strip newline
+ line = line.rstrip()
+
+ # Log the output
+ log.debug(line)
+
+ # Store the output if requested
+ if return_output:
+ stdout.append(line)
+
+ # Wait until the process has finished
+ returncode = await process.wait()
+
+ # Check the return code
+ if returncode:
+ # Fetch any output from the standard error output
+ stderr = await process.stderr.read()
+ stderr = stderr.decode()
+
+ # Log the error
+ log.error("Error running command: %s (code=%s)" % (
+ " ".join(command), returncode,
+ ))
+ if stderr:
+ log.error(stderr)
+
+ raise CommandExecutionError(returncode, stderr)
+
+ # Return output if requested
+ if return_output:
+ return "\n".join(stdout)
+
+ async def _krb5_auth(self, **kwargs):
+ log.debug("Performing Kerberos authentication...")
+
+ # Fetch path to keytab
+ keytab = self.settings.get("krb5-keytab")
+ if not keytab:
+ log.warning("No keytab configured")
+ return
+
+ # Fetch Kerberos principal
+ principal = self.settings.get("krb5-principal")
+ if not principal:
+ log.warning("No Kerberos principal configured")
+ return
+
+ # Fetch a Kerberos ticket
+ await self._command(
+ "kinit", "-k", "-t", keytab, principal, **kwargs,
+ )
+
+ async def exists(self, *args, **kwargs):
+ """
+ Checks whether a file exists
+ """
+ return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+ async def makedirs(self, path, **kwargs):
+ """
+ Creates a new directory
+ """
+ return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
+ async def copy(self, src, dst, mode=None):
+ """
+ Copies a file from src to dst
+ """
+ log.debug("Copying %s to %s" % (src, dst))
+
+ # Create parent directory
+ await self.make_parent_directory(dst)
+
+ # Copy data without any metadata
+ await asyncio.to_thread(shutil.copyfile, src, dst)
+
+ # Set mode
+ if mode:
+ await asyncio.to_thread(os.chmod, dst, mode)
+
+ async def move(self, src, dst, **kwargs):
+ """
+ Moves something from src to dst
+ """
+ log.debug("Moving %s to %s" % (src, dst))
+
+ # Create parent directory
+ await self.make_parent_directory(dst)
+
+ # Move!
+ await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
+ async def make_parent_directory(self, path):
+ """
+ Creates the parent directory of path
+ """
+ path = os.path.dirname(path)
+
+ return await self.makedirs(path, exist_ok=True)
+
+ async def unlink(self, path):
+ """
+ Unlinks path
+ """
+ # Normalize the path
+ path = os.path.abspath(path)
+
+ # Check if the path is within our base directory
+ if not path.startswith(self.basepath):
+ raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+ log.debug("Unlinking %s" % path)
+
+ await asyncio.to_thread(self._unlink, path)
+
+ def _unlink(self, path):
+ # Unlink the file we were asked to unlink
+ try:
+ os.unlink(path)
+ except OSError as e:
+ return
+
+ # Try to delete any empty parent directories
+ while True:
+ # Get the parent directory
+ path = os.path.dirname(path)
+
+ # Break if we reached the base path
+ if path == self.basepath or path == self.path("tmp"):
+ break
+
+ # Call rmdir()
+ try:
+ os.rmdir(path)
+ except OSError as e:
+ break
+
+ log.debug(" Cleaned up %s..." % path)
+
+ async def rmtree(self, path):
+ """
+ Removes a directory recursively
+ """
+ # Normalize the path
+ path = os.path.abspath(path)
+
+ # Check if the path is within our base directory
+ if not path.startswith(self.basepath):
+ raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+ log.debug("Removing %s..." % path)
+
+ try:
+ await asyncio.to_thread(shutil.rmtree, path)
+
+ # Ignore if path didn't exist in the first place
+ except FileNotFoundError:
+ pass
+
+ async def chmod(self, *args, **kwargs):
+ return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
+ def tempfile(self, mode="w+b", delete=True):
+ """
+ Returns an open file handle to a new temporary file
+ """
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+ def tempdir(self, **kwargs):
+ """
+ Asynchronously creates a new temporary directory
+ """
+ # Default to our own tmp directory
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
+
+ def _write_tempfile(self, content):
+ """
+ Writes the content to a temporary file and returns its path
+ """
+ t = self.tempfile(delete=False)
+
+ # Write the content
+ if content:
+ t.write(content.encode())
+
+ # Close the file
+ t.close()
+
+ return t.name
+
+ async def open(self, path):
+ """
+ Opens a package and returns the archive
+ """
+ log.debug("Opening %s..." % path)
+
+ # Open the archive
+ async with self.pakfire() as p:
+ return await asyncio.to_thread(p.open, path)
+
+ @property
+ def ssl_context(self):
+ # Create SSL context
+ context = ssl.create_default_context()
+
+ # Fetch client certificate
+ certificate = self.settings.get("client-certificate", None)
+ key = self.settings.get("client-key", None)
+
+ # Apply client certificate
+ if certificate and key:
+ with tempfile.NamedTemporaryFile(mode="w") as f_cert:
+ f_cert.write(certificate)
+ f_cert.flush()
+
+ with tempfile.NamedTemporaryFile(mode="w") as f_key:
+ f_key.write(key)
+ f_key.flush()
+
+ context.load_cert_chain(f_cert.name, f_key.name)
+
+ return context
+
+ async def load_certificate(self, certfile, keyfile):
+ with self.db.transaction():
+ # Load certificate
+ with open(certfile) as f:
+ self.settings.set("client-certificate", f.read())
+
+ # Load key file
+ with open(keyfile) as f:
+ self.settings.set("client-key", f.read())
+
+ log.info("Updated certificates")
+
+ async def cleanup(self):
+ """
+ Called regularly to cleanup any left-over resources
+ """
+ # Messages
+ await self.messages.queue.cleanup()
+
+ # Sessions
+ await self.sessions.cleanup()
+
+ # Uploads
+ await self.uploads.cleanup()
+
+ async def sync(self):
+ """
+ Syncs any repository that should be mirrored
+ """
+ log.info("Syncing mirrors...")
+
+ # Fetch the sync target
+ target = self.settings.get("sync-target")
+ if not target:
+ log.warning("No sync target configured")
+ return 0
+
+ # Update the timestamp
+ await self._update_timestamp()
+
+ commandline = [
+ "rsync",
+
+ # Show what is being transferred
+ "--verbose",
+
+ # Compress any transferred data
+ "--compress",
+
+ # Enable archive mode
+ "--archive",
+
+ # Preserve hardlinks, ACLs & XATTRs
+ "--hard-links",
+ "--acls",
+ "--xattrs",
+
+ # Delete any files that we have deleted
+ "--delete",
+ "--delete-excluded",
+
+ # Remove any empty directories
+ "--prune-empty-dirs",
+
+ # Make the transaction atomic
+ "--delay-updates",
+
+ # Add source & target
+ "%s/" % self.basepath,
+ target,
+
+ # Sync the .timestamp
+ "--include=.timestamp",
+ ]
+
+ # Add all mirrored repositories
+ for repo in self.repos.mirrored:
+ path = os.path.relpath(repo.local_path(), self.basepath)
+
+ commandline.append("--include=%s***" % path)
+
+ # Exclude everything that hasn't been included
+ commandline += ("--include=*/", "--exclude=*")
+
+ # Run command
+ await self.command(*commandline, krb5_auth=True)
+
+ async def _update_timestamp(self):
+ """
+ Updates the .timestamp file in the root of the exported files
+ """
+ # Make filename
+ path = os.path.join(self.basepath, ".timestamp")
+
+ t = datetime.datetime.utcnow()
+
+ # Write the current time as seconds since epoch
+ async with aiofiles.open(path, mode="w") as f:
+ await f.write(t.strftime("%s"))
+
+
+def setup_logging():
+ """
+ Configures the logger for the buildservice backend
+ """
+ # Log everything to journal
+ handler = systemd.journal.JournalHandler(
+ SYSLOG_IDENTIFIER="pakfire-build-service",
+ )
+ log.addHandler(handler)
+
+
+# Setup logging
+setup_logging()