#!/usr/bin/python
+import aiofiles
import asyncio
import configparser
+import datetime
import inspect
import logging
import os
from . import messages
from . import mirrors
from . import packages
+from . import ratelimiter
from . import releasemonitoring
from . import repository
from . import settings
# Initialize the HTTP Client
self.httpclient = httpclient.HTTPClient(self)
- self.aws = aws.AWS(self)
- self.builds = builds.Builds(self)
- self.cache = cache.Cache(self)
- self.jobs = jobs.Jobs(self)
- self.builders = builders.Builders(self)
- self.distros = distribution.Distributions(self)
- self.events = events.Events(self)
- self.keys = keys.Keys(self)
- self.logstreams = logstreams.LogStreams(self)
- self.messages = messages.Messages(self)
- self.mirrors = mirrors.Mirrors(self)
- self.packages = packages.Packages(self)
- self.releasemonitoring = releasemonitoring.ReleaseMonitoring(self)
- self.repos = repository.Repositories(self)
- self.sessions = sessions.Sessions(self)
- self.sources = sources.Sources(self)
- self.uploads = uploads.Uploads(self)
- self.users = users.Users(self)
+ self.aws = aws.AWS(self)
+ self.builds = builds.Builds(self)
+ self.cache = cache.Cache(self)
+ self.jobs = jobs.Jobs(self)
+ self.builders = builders.Builders(self)
+ self.distros = distribution.Distributions(self)
+ self.events = events.Events(self)
+ self.keys = keys.Keys(self)
+ self.logstreams = logstreams.LogStreams(self)
+ self.messages = messages.Messages(self)
+ self.mirrors = mirrors.Mirrors(self)
+ self.packages = packages.Packages(self)
+ self.monitorings = releasemonitoring.Monitorings(self)
+ self.ratelimiter = ratelimiter.RateLimiter(self)
+ self.repos = repository.Repositories(self)
+ self.sessions = sessions.Sessions(self)
+ self.sources = sources.Sources(self)
+ self.uploads = uploads.Uploads(self)
+ self.users = users.Users(self)
# Open a connection to bugzilla.
self.bugzilla = bugtracker.Bugzilla(self)
log.info("Pakfire Build Service initialized at %s" % self.basepath)
+ def launch_background_tasks(self):
+ # Launch some initial tasks
+ self.run_task(self.users.generate_vapid_keys)
+ self.run_task(self.builders.autoscale)
+
+ # Regularly sync data to the mirrors
+ self.run_periodic_task(300, self.sync)
+
+ # Regularly check the mirrors
+ self.run_periodic_task(300, self.mirrors.check)
+
+ # Regularly fetch sources
+ self.run_periodic_task(300, self.sources.fetch)
+
+ # Regularly check for new releases
+ self.run_periodic_task(300, self.monitorings.check)
+
+ # Cleanup regularly
+ self.run_periodic_task(3600, self.cleanup)
+
def read_config(self, path):
c = configparser.ConfigParser()
log.debug("Connecting to database %s @ %s" % (name, hostname))
- return database.Connection(hostname, name, user=user, password=password)
+ return database.Connection(self, hostname, name, user=user, password=password)
def _create_tmp_path(self):
"""
"""
Runs this shell command
"""
- with tempfile.TemporaryDirectory() as tmp:
+ async with self.tempdir() as tmp:
# Create a minimal environment
env = {
"HOME" : os.environ.get("HOME", tmp),
# Run the command
return await self._command(*command, env=env, **kwargs)
- async def _command(self, *command, return_output=False, **kwargs):
+ async def _command(self, *command, return_output=False, input=None, **kwargs):
log.debug("Running command: %s" % " ".join(command))
# Fork child process
process = await asyncio.create_subprocess_exec(
*command,
- stdin=asyncio.subprocess.DEVNULL,
+ stdin=asyncio.subprocess.PIPE if input else asyncio.subprocess.DEVNULL,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
**kwargs,
)
+ # Send input
+ if input:
+ # Convert to bytes
+ if not isinstance(input, bytes):
+ input = input.encode()
+
+ # Write the entire data chunk by chunk
+ while input:
+ chunk, input = input[0:64], input[64:]
+ if not chunk:
+ break
+
+ process.stdin.write(chunk)
+ await process.stdin.drain()
+
+ # Close the input once we are done
+ process.stdin.close()
+
stdout = []
# Fetch output of command and send it to the logger
"kinit", "-k", "-t", keytab, principal, **kwargs,
)
+ async def exists(self, *args, **kwargs):
+ """
+ Checks whether a file exists
+ """
+ return await asyncio.to_thread(os.path.exists, *args, **kwargs)
+
+ async def makedirs(self, path, **kwargs):
+ """
+ Creates a new directory
+ """
+ return await asyncio.to_thread(os.makedirs, path, **kwargs)
+
async def copy(self, src, dst, mode=None):
"""
Copies a file from src to dst
if mode:
await asyncio.to_thread(os.chmod, dst, mode)
+ async def move(self, src, dst, **kwargs):
+ """
+ Moves something from src to dst
+ """
+ log.debug("Moving %s to %s" % (src, dst))
+
+ # Create parent directory
+ await self.make_parent_directory(dst)
+
+ # Move!
+ await asyncio.to_thread(shutil.move, src, dst, **kwargs)
+
async def make_parent_directory(self, path):
"""
Creates the parent directory of path
"""
path = os.path.dirname(path)
- # Create destination path (if it does not exist)
- try:
- await asyncio.to_thread(os.makedirs, path)
- except FileExistsError:
- pass
+ return await self.makedirs(path, exist_ok=True)
async def unlink(self, path):
"""
path = os.path.dirname(path)
# Break if we reached the base path
- if path == self.basepath:
+ if path == self.basepath or path == self.path("tmp"):
break
# Call rmdir()
log.debug(" Cleaned up %s..." % path)
+ async def rmtree(self, path):
+ """
+ Removes a directory recursively
+ """
+ # Normalize the path
+ path = os.path.abspath(path)
+
+ # Check if the path is within our base directory
+ if not path.startswith(self.basepath):
+ raise OSError("Cannot delete %s which is outside %s" % (path, self.basepath))
+
+ log.debug("Removing %s..." % path)
+
+ try:
+ await asyncio.to_thread(shutil.rmtree, path)
+
+ # Ignore if path didn't exist in the first place
+ except FileNotFoundError:
+ pass
+
+ async def chmod(self, *args, **kwargs):
+ return await asyncio.to_thread(os.chmod, *args, **kwargs)
+
def tempfile(self, mode="w+b", delete=True):
"""
Returns an open file handle to a new temporary file
"""
path = self.path("tmp")
- return tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+ return aiofiles.tempfile.NamedTemporaryFile(mode=mode, dir=path, delete=delete)
+
+ def tempdir(self, **kwargs):
+ """
+ Asynchronously creates a new temporary directory
+ """
+ # Default to our own tmp directory
+ path = self.path("tmp")
+
+ return aiofiles.tempfile.TemporaryDirectory(dir=path, **kwargs)
def _write_tempfile(self, content):
"""
"""
Opens a package and returns the archive
"""
- return await asyncio.to_thread(self._open, path)
-
- def _open(self, path):
log.debug("Opening %s..." % path)
# Open the archive
- with self.pakfire() as p:
- return p.open(path)
+ async with self.pakfire() as p:
+ return await asyncio.to_thread(p.open, path)
@property
def ssl_context(self):
log.warning("No sync target configured")
return 0
+ # Update the timestamp
+ await self._update_timestamp()
+
commandline = [
"rsync",
# Show what is being transferred
- #"--verbose",
+ "--verbose",
# Compress any transferred data
"--compress",
# Add source & target
"%s/" % self.basepath,
target,
+
+ # Sync the .timestamp
+ "--include=.timestamp",
]
# Add all mirrored repositories
# Run command
await self.command(*commandline, krb5_auth=True)
+ async def _update_timestamp(self):
+ """
+ Updates the .timestamp file in the root of the exported files
+ """
+ # Make filename
+ path = os.path.join(self.basepath, ".timestamp")
+
+ t = datetime.datetime.utcnow()
+
+ # Write the current time as seconds since epoch
+ async with aiofiles.open(path, mode="w") as f:
+ await f.write(t.strftime("%s"))
+
def setup_logging():
"""