--- /dev/null
+FROM debian:latest
+
+ENV LC_ALL=C.UTF-8
+
+# install project dependencies
+
+## build essentials
+RUN apt-get update && apt-get install --no-install-recommends --no-install-suggests -y build-essential git ca-certificates
+
+## python
+RUN apt-get update && apt-get install --no-install-recommends --no-install-suggests -y python3 python3-pip python3-dev
+
+## glib dependencies
+RUN apt-get update && apt-get install --no-install-recommends --no-install-suggests -y libcairo2-dev libglib2.0-0 libgirepository1.0-dev
+
+## python setuptools
+RUN apt-get update && apt-get install --no-install-recommends --no-install-suggests -y python3-setuptools
+
+## python libraries
+RUN pip3 install aiohttp strictyaml pydbus PyGObject
+
+## systemd
+RUN apt-get update && apt-get install --no-install-recommends --no-install-suggests -y systemd
+
+## kresd
+RUN apt-get update && apt-get install --no-install-recommends --no-install-suggests -y wget
+RUN wget https://secure.nic.cz/files/knot-resolver/knot-resolver-release.deb && dpkg -i knot-resolver-release.deb
+RUN apt-get update && apt-get install -y knot-resolver
+
+# dbus
+RUN apt-get update && apt-get install --no-install-recommends --no-install-suggests -y dbus
+
+
+# install test dependencies
+RUN apt-get update && apt-get install --no-install-recommends --no-install-suggests -y curl
+
+CMD ["/bin/systemd"]
--- /dev/null
+[Unit]
+Description=Knot Resolver Manager
+Requires=dbus
+After=dbus
+
+[Service]
+WorkingDirectory=/repo
+ExecStart=/usr/bin/python3 -m knot_resolver_manager
+KillSignal=SIGINT
+
+[Install]
+WantedBy=multi-user.target
--- /dev/null
+#!/bin/bash
+
+# fail early
+set -e
+
+cd /test
+echo "Starting manager..."
+cp knot-manager.service /etc/systemd/system
+systemctl daemon-reload
+systemctl start knot-manager.service
+
+
+echo -e "lua_config: \"\"\nnum_workers: 4" | curl -X POST --unix-socket /tmp/manager.sock localhost/config -f --data-binary @-
+
+# assert that any kresd process is running
+systemctl status | grep kresd
+import asyncio
+import signal
from aiohttp import web
from knot_resolver_manager.kresd_manager import KresdManager
from . import confmodel
from . import compat
+_SOCKET_PATH = '/tmp/manager.sock'
async def hello(_request: web.Request) -> web.Response:
return web.Response(text="Hello, world")
# initialize KresdManager
manager = KresdManager()
- compat.asyncio_run(manager.load_system_state())
app["kresd_manager"] = manager
+ async def init_manager(app):
+ await app['kresd_manager'].load_system_state()
+ app.on_startup.append(init_manager)
# configure routing
app.add_routes([web.get("/", hello), web.post("/config", apply_config)])
# run forever
- web.run_app(app, path="./manager.sock")
+ web.run_app(app, path=_SOCKET_PATH)
if __name__ == "__main__":
from typing import List, Union
from typing_extensions import Literal
+from threading import Thread
from pydbus import SystemBus
from gi.repository import GLib
def _wait_for_job_completion(systemd, job):
- global result_state
-
- loop = GLib.MainLoop()
- systemd.JobRemoved.connect(_wait_for_job_completion_handler(loop, job))
- result_state = None
- loop.run()
+ def event_loop_isolation_thread():
+ global result_state
- if result_state != "done":
- raise SystemdException(
- f"Job completed with state '{result_state}' instead of expected 'done'"
- )
+ loop = GLib.MainLoop()
+ systemd.JobRemoved.connect(_wait_for_job_completion_handler(loop, job))
+ result_state = None
+ loop.run()
+
+ if result_state != "done":
+ raise SystemdException(
+ f"Job completed with state '{result_state}' instead of expected 'done'"
+ )
+
+ thread = Thread(target=event_loop_isolation_thread)
+ thread.start()
+ thread.join()
def _wait_for_job_completion_handler(loop, job_path):