From: Michael Tremer Date: Sat, 5 Oct 2024 14:28:57 +0000 (+0000) Subject: python: Remove the Python daemon implementation X-Git-Tag: 0.9.30~1156 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=d8781671e2abac0e8e8e46ee68aa76e2841c7c3c;p=pakfire.git python: Remove the Python daemon implementation Signed-off-by: Michael Tremer --- diff --git a/.gitignore b/.gitignore index a711675c1..063c7a9e1 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,6 @@ /pakfire-client /pakfire-daemon /src/pakfire/__version__.py -/src/scripts/pakfire-daemon /src/systemd/*.service /tests/.root /tests/libpakfire/arch diff --git a/Makefile.am b/Makefile.am index 4ed9b788f..df1386edb 100644 --- a/Makefile.am +++ b/Makefile.am @@ -102,24 +102,11 @@ dist_doc_DATA = \ # ------------------------------------------------------------------------------ -bin_SCRIPTS = \ - src/scripts/pakfire-daemon - -EXTRA_DIST += \ - src/scripts/pakfire-daemon.in - -CLEANFILES += \ - src/scripts/pakfire-daemon - -# ------------------------------------------------------------------------------ - pakfire_PYTHON = \ src/pakfire/__init__.py \ src/pakfire/__version__.py \ - src/pakfire/buildservice.py \ src/pakfire/config.py \ src/pakfire/constants.py \ - src/pakfire/daemon.py \ src/pakfire/errors.py \ src/pakfire/i18n.py \ src/pakfire/logger.py \ diff --git a/src/pakfire/buildservice.py b/src/pakfire/buildservice.py deleted file mode 100644 index 08f9d5f87..000000000 --- a/src/pakfire/buildservice.py +++ /dev/null @@ -1,447 +0,0 @@ -############################################################################### -# # -# Pakfire - The IPFire package management system # -# Copyright (C) 2023 Pakfire development team # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -############################################################################### - -import asyncio -import json -import kerberos -import logging -import os - -import subprocess -import tempfile -import tornado.httpclient -import tornado.simple_httpclient -import tornado.websocket -import urllib.parse - -from .__version__ import PAKFIRE_VERSION -from . import _pakfire -from . import util - -# Configure some useful defaults for all requests -tornado.httpclient.AsyncHTTPClient.configure( - None, defaults = { - "user_agent" : "pakfire/%s" % PAKFIRE_VERSION, - }, -) - -# Setup logging -log = logging.getLogger("pakfire.buildservice") - -class AuthError(Exception): - """ - Raised when the client could not authenticate against the build service - """ - pass - - -class TransportError(Exception): - pass - - -class TemporaryConnectionError(TransportError): - """ - Raised when there is a temporary connection issue and - the request should be tried again. - """ - pass - - -class BuildService(_pakfire.BuildService): - """ - This wraps the parts of the build service - that has been implemented in libpakfire. - """ - def __init__(self): - super().__init__() - - # Initialise the HTTP client - self.client = tornado.httpclient.AsyncHTTPClient() - - async def _socket(self, path, **kwargs): - return await self._request("GET", path, - - # Enable websocket and ping once every ten seconds - websocket=True, - websocket_ping_interval=10, - websocket_ping_timeout=60, - - **kwargs, - ) - - async def _request(self, method, path, websocket=False, websocket_ping_interval=None, - websocket_ping_timeout=None, authenticate=True, - body=None, body_producer=None, on_message_callback=None, **kwargs): - headers = {} - query_args = {} - - # Make absolute URL - url = urllib.parse.urljoin(self.url, path) - - # Change scheme for websocket - if websocket and url.startswith("https://"): - url = url.replace("https://", "wss://") - - # Filter all query arguments - for arg in kwargs: - # Skip anything that is None - if kwargs[arg] is None: - continue - - # Add to query arguments - query_args[arg] = kwargs[arg] - - # Encode query arguments - query_args = urllib.parse.urlencode(query_args, doseq=True) - - # Add query arguments - if method in ("GET", "PUT", "DELETE"): - url = "%s?%s" % (url, query_args) - - # Add any arguments to the body - elif method == "POST": - if body is None: - body = query_args - - # Perform Kerberos authentication - if authenticate: - krb5_context = self._setup_krb5_context(url) - - # Fetch the Kerberos client response - krb5_client_response = kerberos.authGSSClientResponse(krb5_context) - - # Set the Negotiate header - headers |= { - "Authorization" : "Negotiate %s" % krb5_client_response, - } - - # Make the request - req = tornado.httpclient.HTTPRequest( - method=method, url=url, headers=headers, body=body, - - # Give the server more time to respond - request_timeout=60, - - # Add all the rest - body_producer=body_producer, - ) - - # Is this a web socket request? - if websocket: - return await tornado.websocket.websocket_connect( - req, - ping_interval=websocket_ping_interval, - ping_timeout=websocket_ping_timeout, - on_message_callback=on_message_callback, - ) - - # Send the request and wait for a response - try: - res = await self.client.fetch(req) - - # Catch any HTTP errors - except tornado.httpclient.HTTPError as e: - if e.code in (502, 503): - raise TemporaryConnectionError from e - - # Re-raise anything else - raise e - - # Perform mutual authentication - if authenticate: - for header in res.headers.get_list("WWW-Authenticate"): - # Skip anything that isn't a Negotiate header - if not header.startswith("Negotiate "): - continue - - # Fetch the server response - krb5_server_response = header.removeprefix("Negotiate ") - - # Validate the server response - result = kerberos.authGSSClientStep(krb5_context, krb5_server_response) - if not result == kerberos.AUTH_GSS_COMPLETE: - raise AuthError("Could not verify the Kerberos server response") - - log.debug("Kerberos Server Response validating succeeded") - - # Call this so that we won't end in the else block - break - - # If there were no headers - else: - raise AuthError("Mutual authentication failed") - - # Decode JSON response - if res.body: - return json.loads(res.body) - - # Empty response - return {} - - async def _proxy(self, cls, *args, **kwargs): - conn = cls(self, *args, **kwargs) - - # Create the initial connection - await conn.reconnect() - - return conn - - def _setup_krb5_context(self, url): - """ - Creates the Kerberos context that can be used to perform client - authentication against the server, and mutual authentication for the server. - """ - # Parse the input URL - url = urllib.parse.urlparse(url) - - # Create a new client context - result, krb5_context = kerberos.authGSSClientInit("HTTP@%s" % url.hostname) - - if not result == kerberos.AUTH_GSS_COMPLETE: - raise AuthError("Could not create Kerberos Client context") - - # Next step... - try: - result = kerberos.authGSSClientStep(krb5_context, "") - - except kerberos.GSSError as e: - log.error("Kerberos authentication failed: %s" % e) - - raise AuthError("%s" % e) from e - - if not result == kerberos.AUTH_GSS_CONTINUE: - raise AuthError("Cloud not continue Kerberos authentication") - - return krb5_context - - # Builder - - async def control(self, *args, **kwargs): - """ - Creates a control connection - """ - return await self._proxy(ControlConnection, *args, **kwargs) - - async def job(self, *args, **kwargs): - """ - Creates a control connection for a certain job - """ - return await self._proxy(JobControlConnection, *args, **kwargs) - - -class Connection(object): - def __init__(self, service, *args, **kwargs): - self.service = service - - # The active connection - self.conn = None - - # Callbacks - self.callbacks = {} - - # Perform custom initialization - self.init(*args, **kwargs) - - def init(self, *args, **kwargs): - pass - - @property - def url(self): - raise NotImplementedError - - async def connect(self): - """ - This will create a connection - """ - return await self.service._socket(self.url, - on_message_callback=self.on_message_callback) - - async def reconnect(self): - """ - Tries to reconnect for forever - """ - attempts = 0 - - while True: - attempts += 1 - - log.debug("Trying to reconnect (attempt %s)..." % attempts) - - try: - self.conn = await self.connect() - - # The web service responded with some error - except tornado.httpclient.HTTPClientError as e: - log.error("%s: Received HTTP Error %s" % (self.url, e.code)) - - # 502 - Proxy Error - # 503 - Service Unavailable - if e.code in (502, 503): - await asyncio.sleep(10) - - # Raise any unhandled errors - else: - raise e - - # The web service did not respond in time - except tornado.simple_httpclient.HTTPTimeoutError as e: - await asyncio.sleep(30) - - # Raise all other exceptions - except Exception as e: - raise e - - # If the connection was established successfully, we return - else: - return - - def close(self): - """ - Closes the connection - """ - if self.conn: - self.conn.close() - - def on_message_callback(self, message): - # Fail if no callbacks have been set - if not self.callbacks: - raise NotImplementedError - - # Decode the message - message = self._decode_json_message(message) - - # Ignore empty messages - if message is None: - return - - # Log the received message - log.debug("Received message:\n%s" % json.dumps(message, indent=4)) - - # Fetch the message type & data - type = message.get("type") - data = message.get("data") - - # Find a suitable callback - try: - callback = self.callbacks[type] - - # Log an error for unknown messages and ignore them - except KeyError: - log.error("Received message of unknown type '%s'" % type) - return - - # Call the callback - callback(data) - - @staticmethod - def _decode_json_message(message): - """ - Takes a received message and decodes it - """ - # Ignore empty messages - if message is None: - return - - try: - message = json.loads(message) - except json.JSONDecodeError: - log.error("Could not decode JSON message:\n%s" % message) - return - - return message - - async def write_message(self, message, **kwargs): - """ - Sends a message but encodes it into JSON first - """ - # This should never happen - if not self.conn: - raise RuntimeError("Not connected") - - if isinstance(message, dict): - message = tornado.escape.json_encode(message) - - try: - return await self.conn.write_message(message, **kwargs) - - except tornado.websocket.WebSocketClosedError as e: - # Try to reconnect - await self.reconnect() - - # Try to send the message again - return await self.write_message(message, **kwargs) - - -class ControlConnection(Connection): - url = "/api/v1/builders/control" - - def init(self, daemon): - self.daemon = daemon - - # Callbacks - self.callbacks = { - "job" : self.daemon.job_received, - } - - -class JobControlConnection(Connection): - """ - Proxy for Build Jobs - """ - def init(self, id, worker): - self.id = id - - # Callbacks - self.callbacks = { - "abort" : worker.abort, - } - - @property - def url(self): - return "/api/v1/jobs/%s" % self.id - - async def finished(self, success, packages=None, logfile=None): - """ - Will tell the hub that a job has finished - """ - # Upload the log file - if logfile: - logfile = await self.service.upload(logfile, filename="%s.log" % self.id) - - # Upload the packages - if packages: - for package in packages: - await self.service.upload(package) - - # Send request - self.service.job_finished(self.id, success=success, logfile=logfile, packages=packages) - - async def log(self, timestamp, level, message): - """ - Sends a log message to the hub - """ - await self.write_message({ - "type" : "log", - "data" : { - "timestamp" : timestamp, - "level" : level, - "message" : message, - }, - }) diff --git a/src/pakfire/daemon.py b/src/pakfire/daemon.py deleted file mode 100644 index ec794ee6f..000000000 --- a/src/pakfire/daemon.py +++ /dev/null @@ -1,425 +0,0 @@ -#!/usr/bin/python3 - -import asyncio -import functools -import glob -import io -import json -import logging -import logging.handlers -import multiprocessing -import os.path -import setproctitle -import signal -import socket -import tempfile - -from . import _pakfire -from . import buildservice -from . import config -from . import logger - -from pakfire.constants import * -from pakfire.i18n import _ - -# Setup logging -log = logging.getLogger("pakfire.daemon") - -class Daemon(object): - def __init__(self, config_file="daemon.conf", debug=False, verbose=False): - self.config = config.Config(config_file) - self.debug = debug - self.verbose = verbose - - # Setup logger - self.log = logger.setup( - "pakfire", - syslog_identifier="pakfire-daemon", - enable_console=self.verbose, - debug=self.debug, - ) - - # Initialize the connection to the buildservice - self.service = buildservice.BuildService() - - # Set when this process receives a shutdown signal - self._shutdown_signalled = None - - # List of worker processes. - self.workers = [] - - # Stats Connection - self.stats = None - - @property - def ccache_path(self): - """ - Returns the ccache path - """ - return self.config.get("daemon", "ccache_path", "/var/cache/pakfire/ccache") - - async def run(self): - """ - Main loop. - """ - # Register signal handlers. - self.register_signal_handlers() - - # Initialize shutdown signal - self._shutdown_signalled = asyncio.Event() - - # Create the control connection - self.control = await self.service.control(daemon=self) - - # Run main loop - while True: - # Submit stats - self.service.submit_stats() - - # Check if we are running by awaiting the shutdown signal - try: - await asyncio.wait_for(self._shutdown_signalled.wait(), timeout=5) - break - except asyncio.TimeoutError: - pass - - # Main loop has ended, but we wait until all workers have finished. - self.terminate_all_workers() - - def shutdown(self): - """ - Terminates all workers and exists the daemon. - """ - # Ignore if the main method has never been called - if not self._shutdown_signalled: - return - - # Ignore, if we are already shutting down - if self._shutdown_signalled.is_set(): - return - - self.log.info(_("Shutting down...")) - self._shutdown_signalled.set() - - # Close the control connection - if self.control: - self.control.close() - - def terminate_all_workers(self): - """ - Terminates all workers. - """ - self.log.debug("Sending SIGTERM to all workers") - - # First send SIGTERM to all processes - for worker in self.workers: - worker.terminate() - - self.log.debug("Waiting for workers to terminate") - - # Then wait until they all have finished. - for worker in self.workers: - worker.join() - - self.log.debug("All workers have finished") - - # Signal handling. - - def register_signal_handlers(self): - signal.signal(signal.SIGCHLD, self.handle_SIGCHLD) - signal.signal(signal.SIGINT, self.handle_SIGTERM) - signal.signal(signal.SIGTERM, self.handle_SIGTERM) - - def handle_SIGCHLD(self, signum, frame): - """ - Handle signal SIGCHLD. - """ - # Find the worker process that has terminated - for worker in self.workers: - # Skip any workers that are still alive - if worker.is_alive(): - continue - - self.log.debug("Worker %s has terminated with status %s" % \ - (worker.pid, worker.exitcode)) - - # Remove the worker from the list - try: - self.workers.remove(worker) - except ValueError: - pass - - # Close the process - worker.close() - - # We finish after handling one worker. If multiple workers have finished - # at the same time, this handler will be called again to handle it. - break - - def handle_SIGTERM(self, signum, frame): - """ - Handle signal SIGTERM. - """ - # Just shut down. - self.shutdown() - - def job_received(self, job): - """ - Called when this builder was assigned a new job - """ - # Launch a new worker - worker = Worker(self, job) - self.workers.append(worker) - - # Run it - worker.start() - - self.log.debug("Spawned a new worker process as PID %s" % worker.pid) - - -class Worker(multiprocessing.Process): - def __init__(self, daemon, data): - multiprocessing.Process.__init__(self) - self.daemon = daemon - - # The job that has been received - self.data = data - - @property - def service(self): - return self.daemon.service - - @property - def log(self): - return self.daemon.log - - def run(self): - self.log.debug("Worker %s has launched" % self.pid) - - # Register signal handlers - self.register_signal_handlers() - - # Run everything from here asynchronously - asyncio.run(self._work()) - - self.log.debug("Worker %s terminated gracefully" % self.pid) - - def is_test(self): - """ - Returns True if this job is a test job - """ - return self.data.get("test", False) - - @property - def job_id(self): - return self.data.get("id") - - @property - def ccache(self): - """ - ccache settings - """ - return self.data.get("ccache", {}) - - @property - def ccache_enabled(self): - return self.ccache.get("enabled", False) - - @property - def ccache_path(self): - """ - The ccache path for this job - """ - path = self.ccache.get("path", None) - - if path: - return os.path.join(self.daemon.ccache_path, path) - - async def _work(self): - """ - Called from the async IO loop doing all the work - """ - success = False - - # Check if we have received some useful data - if not self.job_id: - raise ValueError("Did not receive a job ID") - - # Set the process title - setproctitle.setproctitle("pakfire-worker job %s" % self.job_id) - - # Fetch the build architecture - arch = self.data.get("arch") - - # Fetch the package URL - pkg = self.data.get("pkg") - if not pkg: - raise ValueError("Did not received a package URL") - - # Connect to the service - self.job = await self.service.job(self.job_id, worker=self) - - # Setup build logger - logger = BuildLogger(self.log, self.job) - - # Create a temporary directory in which the built packages will be copied - with tempfile.TemporaryDirectory(prefix="pakfire-packages-") as target: - packages = [] - - # Run the build - try: - build = self.build(pkg, arch=arch, target=target, - logger=logger._log, build_id=self.job_id, - - # Always disable using snapshots - disable_snapshot=True, - - # ccache - disable_ccache=not self.ccache_enabled, - ccache_path=self.ccache_path, - ) - - # Wait until the build process is done and stream the log in the meantime - while not build.done(): - await logger.stream(timeout=1) - - # Await the build task (which would raise any exceptions) - await build - - # Catch any other Exception - except Exception as e: - raise e - - # The build has finished successfully - else: - success = True - - # Find any packages - if not self.is_test(): - packages = glob.glob("%s/*.pfm" % target) - - # Notify the service that the job has finished - finally: - await self.job.finished( - success=success, - logfile=logger.logfile.name, - packages=packages, - ) - - def build(self, *args, **kwargs): - """ - Sets up a new Pakfire instance and runs it in a new thread. - - This method returns an async.Task() object which can be used to track - if this job is still running. - """ - thread = asyncio.to_thread(self._build, *args, **kwargs) - - return asyncio.create_task(thread) - - def _build(self, pkg, arch=None, logger=None, **kwargs): - # Setup Pakfire instance - p = _pakfire.Pakfire(arch=arch, conf=self.pakfire_conf, logger=logger) - - # Run the build - return p.build(pkg, **kwargs) - - def shutdown(self): - self.log.debug("Shutting down worker %s" % self.pid) - - # XXX figure out what to do, when a build is running - - def abort(self, *args, **kwargs): - """ - Called to abort a running build immediately - """ - log.warning("Build job has been aborted") - - # XXX TODO - - # Signal handling. - - def register_signal_handlers(self): - signal.signal(signal.SIGCHLD, self.handle_SIGCHLD) - signal.signal(signal.SIGINT, self.handle_SIGTERM) - signal.signal(signal.SIGTERM, self.handle_SIGTERM) - - def handle_SIGCHLD(self, signum, frame): - """ - Handle signal SIGCHLD. - """ - # Must be here so that SIGCHLD won't be propagated to - # PakfireDaemon. - pass - - def handle_SIGTERM(self, signum, frame): - """ - Handle signal SIGTERM. - """ - self.shutdown() - - @functools.cached_property - def pakfire_conf(self): - """ - Writes the pakfire configuration to file and returns its path - """ - conf = self.data.get("conf") - - # Dump pakfire configuration - log.debug("Pakfire configuration:\n%s" % conf) - - return io.StringIO(conf) - - -class BuildLogger(object): - """ - This class groups together all sorts of logging. - """ - def __init__(self, log, job): - self.log = log - self.job = job - - # Create a logfile - self.logfile = tempfile.NamedTemporaryFile(mode="w") - - # Create a FIFO queue to buffer any log messages - self.queue = asyncio.Queue() - - # Create a new logger - self.logger = self.log.getChild(self.job.id) - self.logger.setLevel(logging.DEBUG) - - # Log everything to the queue - handler = logging.handlers.QueueHandler(self.queue) - handler.setLevel(logging.INFO) - self.logger.addHandler(handler) - - # Log everything to the file - handler = logging.StreamHandler(self.logfile) - handler.setLevel(logging.INFO) - self.logger.addHandler(handler) - - def _log(self, level, message): - # Remove any trailing newline (but only one) - if message: - message = message.removesuffix("\n") - - return self.logger.log(level, message) - - async def stream(self, timeout=0): - while True: - # Fetch a message from the queue - try: - message = await asyncio.wait_for(self.queue.get(), timeout=timeout) - - # If we did not receive any messages within the timeout, - # we return control back to the caller - except asyncio.TimeoutError as e: - break - - # Ignore any empty messages - if message is None: - continue - - # Send message to the service - await self.job.log(message.created, message.levelno, message.getMessage()) diff --git a/src/scripts/pakfire-daemon.in b/src/scripts/pakfire-daemon.in deleted file mode 100644 index bc4ff6c77..000000000 --- a/src/scripts/pakfire-daemon.in +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/python3 -############################################################################## -# # -# Pakfire - The IPFire package management system # -# Copyright (C) 2021 Pakfire development team # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -############################################################################### - -import asyncio -import argparse -import sys - -import pakfire.daemon -from pakfire.i18n import _ - -class Cli(object): - def parse_cli(self): - parser = argparse.ArgumentParser( - description = _("Pakfire daemon command line interface"), - ) - - parser.add_argument("--config", "-c", nargs="?", - default="@sysconfdir@/pakfire/daemon.conf", - help=_("Configuration file to load")) - parser.add_argument("--debug", action="store_true", - help=_("Enable debug mode"), - ) - parser.add_argument("--verbose", action="store_true", - help=_("Enable logging output on the console"), - ) - - return parser.parse_args() - - def __call__(self): - """ - Runs the daemon - """ - args = self.parse_cli() - - # Initialize the daemon - d = pakfire.daemon.Daemon( - args.config, - debug=args.debug, - verbose=args.verbose, - ) - - # Run it - try: - r = asyncio.run(d.run()) - - except (SystemExit, KeyboardInterrupt): - d.shutdown() - - # Return the exit code - sys.exit(r or 0) - -if __name__ == "__main__": - c = Cli() - c()