#!/usr/bin/python
+import aiofiles
import asyncio
import configparser
+import datetime
import inspect
import logging
import os
import pakfire
import shutil
+import ssl
import systemd.journal
import tempfile
import urllib.parse
from . import database
from . import distribution
from . import events
-from . import jobqueue
+from . import httpclient
from . import jobs
from . import keys
+from . import logstreams
from . import messages
from . import mirrors
from . import packages
+from . import ratelimiter
+from . import releasemonitoring
from . import repository
from . import settings
from . import sessions
from . import uploads
from . import users
-log = logging.getLogger("pakfire.buildservice")
+# Setup logging
+log = logging.getLogger("pbs")
# Import version
from .__version__ import VERSION as __version__
# A list of any background tasks
__tasks = set()
- def __init__(self, config_file):
- # Read configuration file.
+ def __init__(self, config_file, test=False):
+ self.test = test
+
+ # Read configuration file
self.config = self.read_config(config_file)
# Fetch the base path
# Global pakfire settings (from database).
self.settings = settings.Settings(self)
+ # Initialize the HTTP Client
+ self.httpclient = httpclient.HTTPClient(self)
+
self.aws = aws.AWS(self)
self.builds = builds.Builds(self)
self.cache = cache.Cache(self)
self.builders = builders.Builders(self)
self.distros = distribution.Distributions(self)
self.events = events.Events(self)
- self.jobqueue = jobqueue.JobQueue(self)
self.keys = keys.Keys(self)
+ self.logstreams = logstreams.LogStreams(self)
self.messages = messages.Messages(self)
self.mirrors = mirrors.Mirrors(self)
self.packages = packages.Packages(self)
+ self.monitorings = releasemonitoring.Monitorings(self)
+ self.ratelimiter = ratelimiter.RateLimiter(self)
self.repos = repository.Repositories(self)
self.sessions = sessions.Sessions(self)
self.sources = sources.Sources(self)
self.users = users.Users(self)
# Open a connection to bugzilla.
- self.bugzilla = bugtracker.Bugzilla(self)
+ self.bugzilla = bugtracker.Bugzilla(self)
+
+ # Create a temporary directory
+ self._create_tmp_path()
log.info("Pakfire Build Service initialized at %s" % self.basepath)
+ def launch_background_tasks(self):
+ # Launch some initial tasks
+ self.run_task(self.users.generate_vapid_keys)
+ self.run_task(self.builders.autoscale)
+
+ # Regularly sync data to the mirrors
+ self.run_periodic_task(300, self.sync)
+
+ # Regularly check the mirrors
+ self.run_periodic_task(300, self.mirrors.check)
+
+ # Regularly fetch sources
+ self.run_periodic_task(300, self.sources.fetch)
+
+ # Regularly check for new releases
+ self.run_periodic_task(300, self.monitorings.check)
+
+ # Cleanup regularly
+ self.run_periodic_task(3600, self.cleanup)
+
def read_config(self, path):
c = configparser.ConfigParser()
log.debug("Connecting to database %s @ %s" % (name, hostname))
- return database.Connection(hostname, name, user=user, password=password)
+ return database.Connection(self, hostname, name, user=user, password=password)
+
+ def _create_tmp_path(self):
+ """
+ This function will create some temporary space with the correct permissions.
+ """
+ path = self.path("tmp")
+
+ try:
+ os.mkdir(path, mode=0o1777)
+
+ # Ignore if the directory already exists
+ except FileExistsError:
+ pass
def path(self, *args):
"""
"""
return os.path.join(self.basepath, *args)
- def path_to_url(self, path):
+ def url_to(self, url):
"""
- Takes a path to a file on the file system and converts it into a URL
+ Takes a relative URL and makes it absolute
"""
# The base URL
baseurl = self.settings.get("baseurl")
+ # Join it all together
+ return urllib.parse.urljoin(baseurl, url)
+
+ def path_to_url(self, path):
+ """
+ Takes a path to a file on the file system and converts it into a URL
+ """
# Path to package
path = os.path.join(
"files", os.path.relpath(path, self.basepath),
)
- # Join it all together
- return urllib.parse.urljoin(baseurl, path)
+ return self.url_to(path)
def pakfire(self, *args, **kwargs):
"""
log.debug("Periodic callback %r started" % callback)
while True:
- # Wait a little moment
- await asyncio.sleep(delay)
-
try:
ret = callback(*args)
except Exception as e:
log.error("Exception in periodic callback %r" % callback, exc_info=True)
+ # Wait a little moment
+ await asyncio.sleep(delay)
+
# Commands
- async def command(self, *args, krb5_auth=False, **kwargs):
+ async def command(self, *command, krb5_auth=False, **kwargs):
"""
Runs this shell command
"""
- # Authenticate using Kerberos
- if krb5_auth:
- await self.krb5_auth()
+ async with self.tempdir() as tmp:
+ # Create a minimal environment
+ env = {
+ "HOME" : os.environ.get("HOME", tmp),
+
+ # Tell the system where to put temporary files
+ "TMPDIR" : tmp,
+
+ # Store any Kerberos credentials here
+ "KRB5CCNAME" : os.path.join(tmp, ".krb5cc"),
+ }
+
+ # Authenticate using Kerberos
+ if krb5_auth:
+ await self._krb5_auth(env=env)
+
+ # Run the command
+ return await self._command(*command, env=env, **kwargs)
- log.debug("Running command: %s" % " ".join(args))
+ async def _command(self, *command, return_output=False, input=None, **kwargs):
+ log.debug("Running command: %s" % " ".join(command))
# Fork child process
process = await asyncio.create_subprocess_exec(
- *args,
- stdin=asyncio.subprocess.DEVNULL,
+ *command,
+ stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
+ stderr=asyncio.subprocess.PIPE,
**kwargs,
)
+ # Send input
+ if input:
+ # Convert to bytes
+ if not isinstance(input, bytes):
+ input = input.encode()
+
+ # Write the entire data chunk by chunk
+ while input:
+ chunk, input = input[0:64], input[64:]
+ if not chunk:
+ break
+
+ process.stdin.write(chunk)
+ await process.stdin.drain()
+
+ # Close the input once we are done
+ process.stdin.close()
+
+ stdout = []
+
# Fetch output of command and send it to the logger
while True:
line = await process.stdout.readline()
# Strip newline
line = line.rstrip()
- log.info(line)
+ # Log the output
+ log.debug(line)
+
+ # Store the output if requested
+ if return_output:
+ stdout.append(line)
# Wait until the process has finished
- await process.wait()
+ returncode = await process.wait()
+
+ # Check the return code
+ if returncode:
+ # Fetch any output from the standard error output
+ stderr = await process.stderr.read()
+ stderr = stderr.decode()
- async def krb5_auth(self):
+ # Log the error
+ log.error("Error running command: %s (code=%s)" % (
+ " ".join(command), returncode,
+ ))
+ if stderr:
+ log.error(stderr)
+
+ raise CommandExecutionError(returncode, stderr)
+
+ # Return output if requested
+ if return_output:
+ return "\n".join(stdout)
+
+ async def _krb5_auth(self, **kwargs):
log.debug("Performing Kerberos authentication...")
# Fetch path to keytab
return
# Fetch a Kerberos ticket
- await self.command(
- "kinit", "-k", "-t", keytab, principal,
+ await self._command(
+ "kinit", "-k", "-t", keytab, principal, **kwargs,
)
- async def copy(self, src, dst):
+ async def exists(self, *args, **kwargs):
+ """
+ Checks whether a file exists
+ """
+ return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+ async def makedirs(self, path, **kwargs):
+ """
+ Creates a new directory
+ """
+ return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
+ async def copy(self, src, dst, mode=None):
"""
Copies a file from src to dst
"""
log.debug("Copying %s to %s" % (src, dst))
- path = os.path.dirname(dst)
-
- # Create destination path (if it does not exist)
- try:
- await asyncio.to_thread(os.makedirs, path)
- except FileExistsError:
- pass
+ # Create parent directory
+ await self.make_parent_directory(dst)
# Copy data without any metadata
await asyncio.to_thread(shutil.copyfile, src, dst)
+ # Set mode
+ if mode:
+ await asyncio.to_thread(os.chmod, dst, mode)
+
+ async def move(self, src, dst, **kwargs):
+ """
+ Moves something from src to dst
+ """
+ log.debug("Moving %s to %s" % (src, dst))
+
+ # Create parent directory
+ await self.make_parent_directory(dst)
+
+ # Move!
+ await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
+ async def make_parent_directory(self, path):
+ """
+ Creates the parent directory of path
+ """
+ path = os.path.dirname(path)
+
+ return await self.makedirs(path, exist_ok=True)
+
async def unlink(self, path):
"""
Unlinks path
"""
+ # Normalize the path
+ path = os.path.abspath(path)
+
+ # Check if the path is within our base directory
+ if not path.startswith(self.basepath):
+ raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
log.debug("Unlinking %s" % path)
+ await asyncio.to_thread(self._unlink, path)
+
+ def _unlink(self, path):
+ # Unlink the file we were asked to unlink
try:
- await asyncio.to_thread(os.unlink, path)
+ os.unlink(path)
except OSError as e:
+ return
+
+ # Try to delete any empty parent directories
+ while True:
+ # Get the parent directory
+ path = os.path.dirname(path)
+
+ # Break if we reached the base path
+ if path == self.basepath or path == self.path("tmp"):
+ break
+
+ # Call rmdir()
+ try:
+ os.rmdir(path)
+ except OSError as e:
+ break
+
+ log.debug(" Cleaned up %s..." % path)
+
+ async def rmtree(self, path):
+ """
+ Removes a directory recursively
+ """
+ # Normalize the path
+ path = os.path.abspath(path)
+
+ # Check if the path is within our base directory
+ if not path.startswith(self.basepath):
+ raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+ log.debug("Removing %s..." % path)
+
+ try:
+ await asyncio.to_thread(shutil.rmtree, path)
+
+ # Ignore if path didn't exist in the first place
+ except FileNotFoundError:
pass
+ async def chmod(self, *args, **kwargs):
+ return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
+ def tempfile(self, mode="w+b", delete=True):
+ """
+ Returns an open file handle to a new temporary file
+ """
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+ def tempdir(self, **kwargs):
+ """
+ Asynchronously creates a new temporary directory
+ """
+ # Default to our own tmp directory
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
+
def _write_tempfile(self, content):
"""
Writes the content to a temporary file and returns its path
"""
- t = tempfile.NamedTemporaryFile(delete=False)
+ t = self.tempfile(delete=False)
# Write the content
if content:
"""
Opens a package and returns the archive
"""
- return await asyncio.to_thread(self._open, path)
-
- def _open(self, path):
- # Create a dummy Pakfire instance
- p = pakfire.Pakfire(offline=True)
+ log.debug("Opening %s..." % path)
# Open the archive
- return p.open(path)
+ async with self.pakfire() as p:
+ return await asyncio.to_thread(p.open, path)
+
+ @property
+ def ssl_context(self):
+ # Create SSL context
+ context = ssl.create_default_context()
+
+ # Fetch client certificate
+ certificate = self.settings.get("client-certificate", None)
+ key = self.settings.get("client-key", None)
+
+ # Apply client certificate
+ if certificate and key:
+ with tempfile.NamedTemporaryFile(mode="w") as f_cert:
+ f_cert.write(certificate)
+ f_cert.flush()
+
+ with tempfile.NamedTemporaryFile(mode="w") as f_key:
+ f_key.write(key)
+ f_key.flush()
+
+ context.load_cert_chain(f_cert.name, f_key.name)
+
+ return context
+
+ async def load_certificate(self, certfile, keyfile):
+ with self.db.transaction():
+ # Load certificate
+ with open(certfile) as f:
+ self.settings.set("client-certificate", f.read())
+
+ # Load key file
+ with open(keyfile) as f:
+ self.settings.set("client-key", f.read())
+
+ log.info("Updated certificates")
async def cleanup(self):
"""
Called regularly to cleanup any left-over resources
"""
+ # Messages
+ await self.messages.queue.cleanup()
+
# Sessions
await self.sessions.cleanup()
log.warning("No sync target configured")
return 0
+ # Update the timestamp
+ await self._update_timestamp()
+
commandline = [
"rsync",
# Show what is being transferred
- #"--verbose",
+ "--verbose",
# Compress any transferred data
"--compress",
# Add source & target
"%s/" % self.basepath,
target,
+
+ # Sync the .timestamp
+ "--include=.timestamp",
]
# Add all mirrored repositories
# Run command
await self.command(*commandline, krb5_auth=True)
+ async def _update_timestamp(self):
+ """
+ Updates the .timestamp file in the root of the exported files
+ """
+ # Make filename
+ path = os.path.join(self.basepath, ".timestamp")
+
+ t = datetime.datetime.utcnow()
+
+ # Write the current time as seconds since epoch
+ async with aiofiles.open(path, mode="w") as f:
+ await f.write(t.strftime("%s"))
+
def setup_logging():
"""
Configures the logger for the buildservice backend
"""
- # Do not propagate anything from the build service up to any Pakfire instances
- log.propagate = 0
-
- # Enable debug logging
- log.setLevel(logging.DEBUG)
-
# Log everything to journal
handler = systemd.journal.JournalHandler(
SYSLOG_IDENTIFIER="pakfire-build-service",