#!/usr/bin/python
+import aiofiles
import asyncio
import configparser
+import datetime
import inspect
import logging
import os
from . import database
from . import distribution
from . import events
-from . import jobqueue
+from . import httpclient
from . import jobs
from . import keys
+from . import logstreams
from . import messages
from . import mirrors
from . import packages
+from . import ratelimiter
+from . import releasemonitoring
from . import repository
from . import settings
from . import sessions
from . import uploads
from . import users
-log = logging.getLogger("pakfire.buildservice")
+# Setup logging
+log = logging.getLogger("pbs")
# Import version
from .__version__ import VERSION as __version__
# Global pakfire settings (from database).
self.settings = settings.Settings(self)
+ # Initialize the HTTP Client
+ self.httpclient = httpclient.HTTPClient(self)
+
self.aws = aws.AWS(self)
self.builds = builds.Builds(self)
self.cache = cache.Cache(self)
self.builders = builders.Builders(self)
self.distros = distribution.Distributions(self)
self.events = events.Events(self)
- self.jobqueue = jobqueue.JobQueue(self)
self.keys = keys.Keys(self)
+ self.logstreams = logstreams.LogStreams(self)
self.messages = messages.Messages(self)
self.mirrors = mirrors.Mirrors(self)
self.packages = packages.Packages(self)
+ self.monitorings = releasemonitoring.Monitorings(self)
+ self.ratelimiter = ratelimiter.RateLimiter(self)
self.repos = repository.Repositories(self)
self.sessions = sessions.Sessions(self)
self.sources = sources.Sources(self)
self.users = users.Users(self)
# Open a connection to bugzilla.
- self.bugzilla = bugtracker.Bugzilla(self)
+ self.bugzilla = bugtracker.Bugzilla(self)
# Create a temporary directory
self._create_tmp_path()
log.info("Pakfire Build Service initialized at %s" % self.basepath)
+ def launch_background_tasks(self):
+ # Launch some initial tasks
+ self.run_task(self.users.generate_vapid_keys)
+ self.run_task(self.builders.autoscale)
+
+ # Regularly sync data to the mirrors
+ self.run_periodic_task(300, self.sync)
+
+ # Regularly check the mirrors
+ self.run_periodic_task(300, self.mirrors.check)
+
+ # Regularly fetch sources
+ self.run_periodic_task(300, self.sources.fetch)
+
+ # Regularly check for new releases
+ self.run_periodic_task(300, self.monitorings.check)
+
+ # Cleanup regularly
+ self.run_periodic_task(3600, self.cleanup)
+
def read_config(self, path):
c = configparser.ConfigParser()
log.debug("Connecting to database %s @ %s" % (name, hostname))
- return database.Connection(hostname, name, user=user, password=password)
+ return database.Connection(self, hostname, name, user=user, password=password)
def _create_tmp_path(self):
"""
"""
return os.path.join(self.basepath, *args)
- def path_to_url(self, path):
+ def url_to(self, url):
"""
- Takes a path to a file on the file system and converts it into a URL
+ Takes a relative URL and makes it absolute
"""
# The base URL
baseurl = self.settings.get("baseurl")
+ # Join it all together
+ return urllib.parse.urljoin(baseurl, url)
+
+ def path_to_url(self, path):
+ """
+ Takes a path to a file on the file system and converts it into a URL
+ """
# Path to package
path = os.path.join(
"files", os.path.relpath(path, self.basepath),
)
- # Join it all together
- return urllib.parse.urljoin(baseurl, path)
+ return self.url_to(path)
def pakfire(self, *args, **kwargs):
"""
log.debug("Periodic callback %r started" % callback)
while True:
- # Wait a little moment
- await asyncio.sleep(delay)
-
try:
ret = callback(*args)
except Exception as e:
log.error("Exception in periodic callback %r" % callback, exc_info=True)
+ # Wait a little moment
+ await asyncio.sleep(delay)
+
# Commands
async def command(self, *command, krb5_auth=False, **kwargs):
"""
Runs this shell command
"""
- with tempfile.TemporaryDirectory() as tmp:
+ async with self.tempdir() as tmp:
# Create a minimal environment
env = {
"HOME" : os.environ.get("HOME", tmp),
# Run the command
return await self._command(*command, env=env, **kwargs)
- async def _command(self, *command, return_output=False, **kwargs):
+ async def _command(self, *command, return_output=False, input=None, **kwargs):
log.debug("Running command: %s" % " ".join(command))
# Fork child process
process = await asyncio.create_subprocess_exec(
*command,
- stdin=asyncio.subprocess.DEVNULL,
+ stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs,
)
+ # Send input
+ if input:
+ # Convert to bytes
+ if not isinstance(input, bytes):
+ input = input.encode()
+
+ # Write the entire data chunk by chunk
+ while input:
+ chunk, input = input[0:64], input[64:]
+ if not chunk:
+ break
+
+ process.stdin.write(chunk)
+ await process.stdin.drain()
+
+ # Close the input once we are done
+ process.stdin.close()
+
stdout = []
# Fetch output of command and send it to the logger
"kinit", "-k", "-t", keytab, principal, **kwargs,
)
+ async def exists(self, *args, **kwargs):
+ """
+ Checks whether a file exists
+ """
+ return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+ async def makedirs(self, path, **kwargs):
+ """
+ Creates a new directory
+ """
+ return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
async def copy(self, src, dst, mode=None):
"""
Copies a file from src to dst
if mode:
await asyncio.to_thread(os.chmod, dst, mode)
+ async def move(self, src, dst, **kwargs):
+ """
+ Moves something from src to dst
+ """
+ log.debug("Moving %s to %s" % (src, dst))
+
+ # Create parent directory
+ await self.make_parent_directory(dst)
+
+ # Move!
+ await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
async def make_parent_directory(self, path):
"""
Creates the parent directory of path
"""
path = os.path.dirname(path)
- # Create destination path (if it does not exist)
- try:
- await asyncio.to_thread(os.makedirs, path)
- except FileExistsError:
- pass
+ return await self.makedirs(path, exist_ok=True)
async def unlink(self, path):
"""
path = os.path.dirname(path)
# Break if we reached the base path
- if path == self.basepath:
+ if path == self.basepath or path == self.path("tmp"):
break
# Call rmdir()
log.debug(" Cleaned up %s..." % path)
+ async def rmtree(self, path):
+ """
+ Removes a directory recursively
+ """
+ # Normalize the path
+ path = os.path.abspath(path)
+
+ # Check if the path is within our base directory
+ if not path.startswith(self.basepath):
+ raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+ log.debug("Removing %s..." % path)
+
+ try:
+ await asyncio.to_thread(shutil.rmtree, path)
+
+ # Ignore if path didn't exist in the first place
+ except FileNotFoundError:
+ pass
+
+ async def chmod(self, *args, **kwargs):
+ return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
+ def tempfile(self, mode="w+b", delete=True):
+ """
+ Returns an open file handle to a new temporary file
+ """
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+ def tempdir(self, **kwargs):
+ """
+ Asynchronously creates a new temporary directory
+ """
+ # Default to our own tmp directory
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
+
def _write_tempfile(self, content):
"""
Writes the content to a temporary file and returns its path
"""
- t = tempfile.NamedTemporaryFile(delete=False)
+ t = self.tempfile(delete=False)
# Write the content
if content:
"""
Opens a package and returns the archive
"""
- return await asyncio.to_thread(self._open, path)
-
- def _open(self, path):
log.debug("Opening %s..." % path)
# Open the archive
- with self.pakfire() as p:
- return p.open(path)
+ async with self.pakfire() as p:
+ return await asyncio.to_thread(p.open, path)
@property
def ssl_context(self):
log.warning("No sync target configured")
return 0
+ # Update the timestamp
+ await self._update_timestamp()
+
commandline = [
"rsync",
# Show what is being transferred
- #"--verbose",
+ "--verbose",
# Compress any transferred data
"--compress",
# Add source & target
"%s/" % self.basepath,
target,
+
+ # Sync the .timestamp
+ "--include=.timestamp",
]
# Add all mirrored repositories
# Run command
await self.command(*commandline, krb5_auth=True)
+ async def _update_timestamp(self):
+ """
+ Updates the .timestamp file in the root of the exported files
+ """
+ # Make filename
+ path = os.path.join(self.basepath, ".timestamp")
+
+ t = datetime.datetime.utcnow()
+
+ # Write the current time as seconds since epoch
+ async with aiofiles.open(path, mode="w") as f:
+ await f.write(t.strftime("%s"))
+
def setup_logging():
"""
Configures the logger for the buildservice backend
"""
- # Do not propagate anything from the build service up to any Pakfire instances
- log.propagate = 0
-
- # Enable debug logging
- log.setLevel(logging.DEBUG)
-
# Log everything to journal
handler = systemd.journal.JournalHandler(
SYSLOG_IDENTIFIER="pakfire-build-service",