/pakfire-client
/pakfire-daemon
/src/pakfire/__version__.py
-/src/scripts/pakfire-daemon
/src/systemd/*.service
/tests/.root
/tests/libpakfire/arch
# ------------------------------------------------------------------------------
-bin_SCRIPTS = \
- src/scripts/pakfire-daemon
-
-EXTRA_DIST += \
- src/scripts/pakfire-daemon.in
-
-CLEANFILES += \
- src/scripts/pakfire-daemon
-
-# ------------------------------------------------------------------------------
-
pakfire_PYTHON = \
src/pakfire/__init__.py \
src/pakfire/__version__.py \
- src/pakfire/buildservice.py \
src/pakfire/config.py \
src/pakfire/constants.py \
- src/pakfire/daemon.py \
src/pakfire/errors.py \
src/pakfire/i18n.py \
src/pakfire/logger.py \
+++ /dev/null
-###############################################################################
-# #
-# Pakfire - The IPFire package management system #
-# Copyright (C) 2023 Pakfire development team #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <http://www.gnu.org/licenses/>. #
-# #
-###############################################################################
-
-import asyncio
-import json
-import kerberos
-import logging
-import os
-
-import subprocess
-import tempfile
-import tornado.httpclient
-import tornado.simple_httpclient
-import tornado.websocket
-import urllib.parse
-
-from .__version__ import PAKFIRE_VERSION
-from . import _pakfire
-from . import util
-
-# Configure some useful defaults for all requests
-tornado.httpclient.AsyncHTTPClient.configure(
- None, defaults = {
- "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
- },
-)
-
-# Setup logging
-log = logging.getLogger("pakfire.buildservice")
-
-class AuthError(Exception):
- """
- Raised when the client could not authenticate against the build service
- """
- pass
-
-
-class TransportError(Exception):
- pass
-
-
-class TemporaryConnectionError(TransportError):
- """
- Raised when there is a temporary connection issue and
- the request should be tried again.
- """
- pass
-
-
-class BuildService(_pakfire.BuildService):
- """
- This wraps the parts of the build service
- that has been implemented in libpakfire.
- """
- def __init__(self):
- super().__init__()
-
- # Initialise the HTTP client
- self.client = tornado.httpclient.AsyncHTTPClient()
-
- async def _socket(self, path, **kwargs):
- return await self._request("GET", path,
-
- # Enable websocket and ping once every ten seconds
- websocket=True,
- websocket_ping_interval=10,
- websocket_ping_timeout=60,
-
- **kwargs,
- )
-
- async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
- websocket_ping_timeout=None, authenticate=True,
- body=None, body_producer=None, on_message_callback=None, **kwargs):
- headers = {}
- query_args = {}
-
- # Make absolute URL
- url = urllib.parse.urljoin(self.url, path)
-
- # Change scheme for websocket
- if websocket and url.startswith("https://"):
- url = url.replace("https://", "wss://")
-
- # Filter all query arguments
- for arg in kwargs:
- # Skip anything that is None
- if kwargs[arg] is None:
- continue
-
- # Add to query arguments
- query_args[arg] = kwargs[arg]
-
- # Encode query arguments
- query_args = urllib.parse.urlencode(query_args, doseq=True)
-
- # Add query arguments
- if method in ("GET", "PUT", "DELETE"):
- url = "%s?%s" % (url, query_args)
-
- # Add any arguments to the body
- elif method == "POST":
- if body is None:
- body = query_args
-
- # Perform Kerberos authentication
- if authenticate:
- krb5_context = self._setup_krb5_context(url)
-
- # Fetch the Kerberos client response
- krb5_client_response = kerberos.authGSSClientResponse(krb5_context)
-
- # Set the Negotiate header
- headers |= {
- "Authorization" : "Negotiate %s" % krb5_client_response,
- }
-
- # Make the request
- req = tornado.httpclient.HTTPRequest(
- method=method, url=url, headers=headers, body=body,
-
- # Give the server more time to respond
- request_timeout=60,
-
- # Add all the rest
- body_producer=body_producer,
- )
-
- # Is this a web socket request?
- if websocket:
- return await tornado.websocket.websocket_connect(
- req,
- ping_interval=websocket_ping_interval,
- ping_timeout=websocket_ping_timeout,
- on_message_callback=on_message_callback,
- )
-
- # Send the request and wait for a response
- try:
- res = await self.client.fetch(req)
-
- # Catch any HTTP errors
- except tornado.httpclient.HTTPError as e:
- if e.code in (502, 503):
- raise TemporaryConnectionError from e
-
- # Re-raise anything else
- raise e
-
- # Perform mutual authentication
- if authenticate:
- for header in res.headers.get_list("WWW-Authenticate"):
- # Skip anything that isn't a Negotiate header
- if not header.startswith("Negotiate "):
- continue
-
- # Fetch the server response
- krb5_server_response = header.removeprefix("Negotiate ")
-
- # Validate the server response
- result = kerberos.authGSSClientStep(krb5_context, krb5_server_response)
- if not result == kerberos.AUTH_GSS_COMPLETE:
- raise AuthError("Could not verify the Kerberos server response")
-
- log.debug("Kerberos Server Response validating succeeded")
-
- # Call this so that we won't end in the else block
- break
-
- # If there were no headers
- else:
- raise AuthError("Mutual authentication failed")
-
- # Decode JSON response
- if res.body:
- return json.loads(res.body)
-
- # Empty response
- return {}
-
- async def _proxy(self, cls, *args, **kwargs):
- conn = cls(self, *args, **kwargs)
-
- # Create the initial connection
- await conn.reconnect()
-
- return conn
-
- def _setup_krb5_context(self, url):
- """
- Creates the Kerberos context that can be used to perform client
- authentication against the server, and mutual authentication for the server.
- """
- # Parse the input URL
- url = urllib.parse.urlparse(url)
-
- # Create a new client context
- result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname)
-
- if not result == kerberos.AUTH_GSS_COMPLETE:
- raise AuthError("Could not create Kerberos Client context")
-
- # Next step...
- try:
- result = kerberos.authGSSClientStep(krb5_context, "")
-
- except kerberos.GSSError as e:
- log.error("Kerberos authentication failed: %s" % e)
-
- raise AuthError("%s" % e) from e
-
- if not result == kerberos.AUTH_GSS_CONTINUE:
- raise AuthError("Cloud not continue Kerberos authentication")
-
- return krb5_context
-
- # Builder
-
- async def control(self, *args, **kwargs):
- """
- Creates a control connection
- """
- return await self._proxy(ControlConnection, *args, **kwargs)
-
- async def job(self, *args, **kwargs):
- """
- Creates a control connection for a certain job
- """
- return await self._proxy(JobControlConnection, *args, **kwargs)
-
-
-class Connection(object):
- def __init__(self, service, *args, **kwargs):
- self.service = service
-
- # The active connection
- self.conn = None
-
- # Callbacks
- self.callbacks = {}
-
- # Perform custom initialization
- self.init(*args, **kwargs)
-
- def init(self, *args, **kwargs):
- pass
-
- @property
- def url(self):
- raise NotImplementedError
-
- async def connect(self):
- """
- This will create a connection
- """
- return await self.service._socket(self.url,
- on_message_callback=self.on_message_callback)
-
- async def reconnect(self):
- """
- Tries to reconnect for forever
- """
- attempts = 0
-
- while True:
- attempts += 1
-
- log.debug("Trying to reconnect (attempt %s)..." % attempts)
-
- try:
- self.conn = await self.connect()
-
- # The web service responded with some error
- except tornado.httpclient.HTTPClientError as e:
- log.error("%s: Received HTTP Error %s" % (self.url, e.code))
-
- # 502 - Proxy Error
- # 503 - Service Unavailable
- if e.code in (502, 503):
- await asyncio.sleep(10)
-
- # Raise any unhandled errors
- else:
- raise e
-
- # The web service did not respond in time
- except tornado.simple_httpclient.HTTPTimeoutError as e:
- await asyncio.sleep(30)
-
- # Raise all other exceptions
- except Exception as e:
- raise e
-
- # If the connection was established successfully, we return
- else:
- return
-
- def close(self):
- """
- Closes the connection
- """
- if self.conn:
- self.conn.close()
-
- def on_message_callback(self, message):
- # Fail if no callbacks have been set
- if not self.callbacks:
- raise NotImplementedError
-
- # Decode the message
- message = self._decode_json_message(message)
-
- # Ignore empty messages
- if message is None:
- return
-
- # Log the received message
- log.debug("Received message:\n%s" % json.dumps(message, indent=4))
-
- # Fetch the message type & data
- type = message.get("type")
- data = message.get("data")
-
- # Find a suitable callback
- try:
- callback = self.callbacks[type]
-
- # Log an error for unknown messages and ignore them
- except KeyError:
- log.error("Received message of unknown type '%s'" % type)
- return
-
- # Call the callback
- callback(data)
-
- @staticmethod
- def _decode_json_message(message):
- """
- Takes a received message and decodes it
- """
- # Ignore empty messages
- if message is None:
- return
-
- try:
- message = json.loads(message)
- except json.JSONDecodeError:
- log.error("Could not decode JSON message:\n%s" % message)
- return
-
- return message
-
- async def write_message(self, message, **kwargs):
- """
- Sends a message but encodes it into JSON first
- """
- # This should never happen
- if not self.conn:
- raise RuntimeError("Not connected")
-
- if isinstance(message, dict):
- message = tornado.escape.json_encode(message)
-
- try:
- return await self.conn.write_message(message, **kwargs)
-
- except tornado.websocket.WebSocketClosedError as e:
- # Try to reconnect
- await self.reconnect()
-
- # Try to send the message again
- return await self.write_message(message, **kwargs)
-
-
-class ControlConnection(Connection):
- url = "/api/v1/builders/control"
-
- def init(self, daemon):
- self.daemon = daemon
-
- # Callbacks
- self.callbacks = {
- "job" : self.daemon.job_received,
- }
-
-
-class JobControlConnection(Connection):
- """
- Proxy for Build Jobs
- """
- def init(self, id, worker):
- self.id = id
-
- # Callbacks
- self.callbacks = {
- "abort" : worker.abort,
- }
-
- @property
- def url(self):
- return "/api/v1/jobs/%s" % self.id
-
- async def finished(self, success, packages=None, logfile=None):
- """
- Will tell the hub that a job has finished
- """
- # Upload the log file
- if logfile:
- logfile = await self.service.upload(logfile, filename="%s.log" % self.id)
-
- # Upload the packages
- if packages:
- for package in packages:
- await self.service.upload(package)
-
- # Send request
- self.service.job_finished(self.id, success=success, logfile=logfile, packages=packages)
-
- async def log(self, timestamp, level, message):
- """
- Sends a log message to the hub
- """
- await self.write_message({
- "type" : "log",
- "data" : {
- "timestamp" : timestamp,
- "level" : level,
- "message" : message,
- },
- })
+++ /dev/null
-#!/usr/bin/python3
-
-import asyncio
-import functools
-import glob
-import io
-import json
-import logging
-import logging.handlers
-import multiprocessing
-import os.path
-import setproctitle
-import signal
-import socket
-import tempfile
-
-from . import _pakfire
-from . import buildservice
-from . import config
-from . import logger
-
-from pakfire.constants import *
-from pakfire.i18n import _
-
-# Setup logging
-log = logging.getLogger("pakfire.daemon")
-
-class Daemon(object):
- def __init__(self, config_file="daemon.conf", debug=False, verbose=False):
- self.config = config.Config(config_file)
- self.debug = debug
- self.verbose = verbose
-
- # Setup logger
- self.log = logger.setup(
- "pakfire",
- syslog_identifier="pakfire-daemon",
- enable_console=self.verbose,
- debug=self.debug,
- )
-
- # Initialize the connection to the buildservice
- self.service = buildservice.BuildService()
-
- # Set when this process receives a shutdown signal
- self._shutdown_signalled = None
-
- # List of worker processes.
- self.workers = []
-
- # Stats Connection
- self.stats = None
-
- @property
- def ccache_path(self):
- """
- Returns the ccache path
- """
- return self.config.get("daemon", "ccache_path", "/var/cache/pakfire/ccache")
-
- async def run(self):
- """
- Main loop.
- """
- # Register signal handlers.
- self.register_signal_handlers()
-
- # Initialize shutdown signal
- self._shutdown_signalled = asyncio.Event()
-
- # Create the control connection
- self.control = await self.service.control(daemon=self)
-
- # Run main loop
- while True:
- # Submit stats
- self.service.submit_stats()
-
- # Check if we are running by awaiting the shutdown signal
- try:
- await asyncio.wait_for(self._shutdown_signalled.wait(), timeout=5)
- break
- except asyncio.TimeoutError:
- pass
-
- # Main loop has ended, but we wait until all workers have finished.
- self.terminate_all_workers()
-
- def shutdown(self):
- """
- Terminates all workers and exists the daemon.
- """
- # Ignore if the main method has never been called
- if not self._shutdown_signalled:
- return
-
- # Ignore, if we are already shutting down
- if self._shutdown_signalled.is_set():
- return
-
- self.log.info(_("Shutting down..."))
- self._shutdown_signalled.set()
-
- # Close the control connection
- if self.control:
- self.control.close()
-
- def terminate_all_workers(self):
- """
- Terminates all workers.
- """
- self.log.debug("Sending SIGTERM to all workers")
-
- # First send SIGTERM to all processes
- for worker in self.workers:
- worker.terminate()
-
- self.log.debug("Waiting for workers to terminate")
-
- # Then wait until they all have finished.
- for worker in self.workers:
- worker.join()
-
- self.log.debug("All workers have finished")
-
- # Signal handling.
-
- def register_signal_handlers(self):
- signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
- signal.signal(signal.SIGINT, self.handle_SIGTERM)
- signal.signal(signal.SIGTERM, self.handle_SIGTERM)
-
- def handle_SIGCHLD(self, signum, frame):
- """
- Handle signal SIGCHLD.
- """
- # Find the worker process that has terminated
- for worker in self.workers:
- # Skip any workers that are still alive
- if worker.is_alive():
- continue
-
- self.log.debug("Worker %s has terminated with status %s" % \
- (worker.pid, worker.exitcode))
-
- # Remove the worker from the list
- try:
- self.workers.remove(worker)
- except ValueError:
- pass
-
- # Close the process
- worker.close()
-
- # We finish after handling one worker. If multiple workers have finished
- # at the same time, this handler will be called again to handle it.
- break
-
- def handle_SIGTERM(self, signum, frame):
- """
- Handle signal SIGTERM.
- """
- # Just shut down.
- self.shutdown()
-
- def job_received(self, job):
- """
- Called when this builder was assigned a new job
- """
- # Launch a new worker
- worker = Worker(self, job)
- self.workers.append(worker)
-
- # Run it
- worker.start()
-
- self.log.debug("Spawned a new worker process as PID %s" % worker.pid)
-
-
-class Worker(multiprocessing.Process):
- def __init__(self, daemon, data):
- multiprocessing.Process.__init__(self)
- self.daemon = daemon
-
- # The job that has been received
- self.data = data
-
- @property
- def service(self):
- return self.daemon.service
-
- @property
- def log(self):
- return self.daemon.log
-
- def run(self):
- self.log.debug("Worker %s has launched" % self.pid)
-
- # Register signal handlers
- self.register_signal_handlers()
-
- # Run everything from here asynchronously
- asyncio.run(self._work())
-
- self.log.debug("Worker %s terminated gracefully" % self.pid)
-
- def is_test(self):
- """
- Returns True if this job is a test job
- """
- return self.data.get("test", False)
-
- @property
- def job_id(self):
- return self.data.get("id")
-
- @property
- def ccache(self):
- """
- ccache settings
- """
- return self.data.get("ccache", {})
-
- @property
- def ccache_enabled(self):
- return self.ccache.get("enabled", False)
-
- @property
- def ccache_path(self):
- """
- The ccache path for this job
- """
- path = self.ccache.get("path", None)
-
- if path:
- return os.path.join(self.daemon.ccache_path, path)
-
- async def _work(self):
- """
- Called from the async IO loop doing all the work
- """
- success = False
-
- # Check if we have received some useful data
- if not self.job_id:
- raise ValueError("Did not receive a job ID")
-
- # Set the process title
- setproctitle.setproctitle("pakfire-worker job %s" % self.job_id)
-
- # Fetch the build architecture
- arch = self.data.get("arch")
-
- # Fetch the package URL
- pkg = self.data.get("pkg")
- if not pkg:
- raise ValueError("Did not received a package URL")
-
- # Connect to the service
- self.job = await self.service.job(self.job_id, worker=self)
-
- # Setup build logger
- logger = BuildLogger(self.log, self.job)
-
- # Create a temporary directory in which the built packages will be copied
- with tempfile.TemporaryDirectory(prefix="pakfire-packages-") as target:
- packages = []
-
- # Run the build
- try:
- build = self.build(pkg, arch=arch, target=target,
- logger=logger._log, build_id=self.job_id,
-
- # Always disable using snapshots
- disable_snapshot=True,
-
- # ccache
- disable_ccache=not self.ccache_enabled,
- ccache_path=self.ccache_path,
- )
-
- # Wait until the build process is done and stream the log in the meantime
- while not build.done():
- await logger.stream(timeout=1)
-
- # Await the build task (which would raise any exceptions)
- await build
-
- # Catch any other Exception
- except Exception as e:
- raise e
-
- # The build has finished successfully
- else:
- success = True
-
- # Find any packages
- if not self.is_test():
- packages = glob.glob("%s/*.pfm" % target)
-
- # Notify the service that the job has finished
- finally:
- await self.job.finished(
- success=success,
- logfile=logger.logfile.name,
- packages=packages,
- )
-
- def build(self, *args, **kwargs):
- """
- Sets up a new Pakfire instance and runs it in a new thread.
-
- This method returns an async.Task() object which can be used to track
- if this job is still running.
- """
- thread = asyncio.to_thread(self._build, *args, **kwargs)
-
- return asyncio.create_task(thread)
-
- def _build(self, pkg, arch=None, logger=None, **kwargs):
- # Setup Pakfire instance
- p = _pakfire.Pakfire(arch=arch, conf=self.pakfire_conf, logger=logger)
-
- # Run the build
- return p.build(pkg, **kwargs)
-
- def shutdown(self):
- self.log.debug("Shutting down worker %s" % self.pid)
-
- # XXX figure out what to do, when a build is running
-
- def abort(self, *args, **kwargs):
- """
- Called to abort a running build immediately
- """
- log.warning("Build job has been aborted")
-
- # XXX TODO
-
- # Signal handling.
-
- def register_signal_handlers(self):
- signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
- signal.signal(signal.SIGINT, self.handle_SIGTERM)
- signal.signal(signal.SIGTERM, self.handle_SIGTERM)
-
- def handle_SIGCHLD(self, signum, frame):
- """
- Handle signal SIGCHLD.
- """
- # Must be here so that SIGCHLD won't be propagated to
- # PakfireDaemon.
- pass
-
- def handle_SIGTERM(self, signum, frame):
- """
- Handle signal SIGTERM.
- """
- self.shutdown()
-
- @functools.cached_property
- def pakfire_conf(self):
- """
- Writes the pakfire configuration to file and returns its path
- """
- conf = self.data.get("conf")
-
- # Dump pakfire configuration
- log.debug("Pakfire configuration:\n%s" % conf)
-
- return io.StringIO(conf)
-
-
-class BuildLogger(object):
- """
- This class groups together all sorts of logging.
- """
- def __init__(self, log, job):
- self.log = log
- self.job = job
-
- # Create a logfile
- self.logfile = tempfile.NamedTemporaryFile(mode="w")
-
- # Create a FIFO queue to buffer any log messages
- self.queue = asyncio.Queue()
-
- # Create a new logger
- self.logger = self.log.getChild(self.job.id)
- self.logger.setLevel(logging.DEBUG)
-
- # Log everything to the queue
- handler = logging.handlers.QueueHandler(self.queue)
- handler.setLevel(logging.INFO)
- self.logger.addHandler(handler)
-
- # Log everything to the file
- handler = logging.StreamHandler(self.logfile)
- handler.setLevel(logging.INFO)
- self.logger.addHandler(handler)
-
- def _log(self, level, message):
- # Remove any trailing newline (but only one)
- if message:
- message = message.removesuffix("\n")
-
- return self.logger.log(level, message)
-
- async def stream(self, timeout=0):
- while True:
- # Fetch a message from the queue
- try:
- message = await asyncio.wait_for(self.queue.get(), timeout=timeout)
-
- # If we did not receive any messages within the timeout,
- # we return control back to the caller
- except asyncio.TimeoutError as e:
- break
-
- # Ignore any empty messages
- if message is None:
- continue
-
- # Send message to the service
- await self.job.log(message.created, message.levelno, message.getMessage())
+++ /dev/null
-#!/usr/bin/python3
-##############################################################################
-# #
-# Pakfire - The IPFire package management system #
-# Copyright (C) 2021 Pakfire development team #
-# #
-# This program is free software: you can redistribute it and/or modify #
-# it under the terms of the GNU General Public License as published by #
-# the Free Software Foundation, either version 3 of the License, or #
-# (at your option) any later version. #
-# #
-# This program is distributed in the hope that it will be useful, #
-# but WITHOUT ANY WARRANTY; without even the implied warranty of #
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
-# GNU General Public License for more details. #
-# #
-# You should have received a copy of the GNU General Public License #
-# along with this program. If not, see <http://www.gnu.org/licenses/>. #
-# #
-###############################################################################
-
-import asyncio
-import argparse
-import sys
-
-import pakfire.daemon
-from pakfire.i18n import _
-
-class Cli(object):
- def parse_cli(self):
- parser = argparse.ArgumentParser(
- description = _("Pakfire daemon command line interface"),
- )
-
- parser.add_argument("--config", "-c", nargs="?",
- default="@sysconfdir@/pakfire/daemon.conf",
- help=_("Configuration file to load"))
- parser.add_argument("--debug", action="store_true",
- help=_("Enable debug mode"),
- )
- parser.add_argument("--verbose", action="store_true",
- help=_("Enable logging output on the console"),
- )
-
- return parser.parse_args()
-
- def __call__(self):
- """
- Runs the daemon
- """
- args = self.parse_cli()
-
- # Initialize the daemon
- d = pakfire.daemon.Daemon(
- args.config,
- debug=args.debug,
- verbose=args.verbose,
- )
-
- # Run it
- try:
- r = asyncio.run(d.run())
-
- except (SystemExit, KeyboardInterrupt):
- d.shutdown()
-
- # Return the exit code
- sys.exit(r or 0)
-
-if __name__ == "__main__":
- c = Cli()
- c()