async def notify() -> None:
loop = asyncio.get_running_loop()
- try:
- while True:
- s, _ = await loop.sock_accept(vsock)
+ while True:
+ s, _ = await loop.sock_accept(vsock)
- for msg in (await loop.sock_recv(s, 4096)).decode().split("\n"):
- if not msg:
- continue
+ for msg in (await loop.sock_recv(s, 4096)).decode().split("\n"):
+ if not msg:
+ continue
- k, _, v = msg.partition("=")
- messages[k] = v
-
- except asyncio.CancelledError:
- pass
+ k, _, v = msg.partition("=")
+ messages[k] = v
with MkosiAsyncioThread(notify()):
yield f"vsock:{socket.VMADDR_CID_HOST}:{vsock.getsockname()[1]}", messages
The default threading.Thread() is not interruptable, so we make our own version by using the concurrency
feature in python that is interruptable, namely asyncio.
- Additionally, we store the result of the coroutine in the result variable so it can be accessed easily
- after the thread finishes.
+ Additionally, we store any exception that the coroutine raises and re-raise it in join() if no other
+ exception was raised before.
"""
def __init__(self, target: Awaitable[Any], *args: Any, **kwargs: Any) -> None:
self.target = target
self.loop: queue.SimpleQueue[asyncio.AbstractEventLoop] = queue.SimpleQueue()
+ self.exc: queue.SimpleQueue[BaseException] = queue.SimpleQueue()
super().__init__(*args, **kwargs)
def run(self) -> None:
self.loop.put(asyncio.get_running_loop())
await self.target
- asyncio.run(wrapper())
+ try:
+ asyncio.run(wrapper())
+ except asyncio.CancelledError:
+ pass
+ except BaseException as e:
+ self.exc.put(e)
def cancel(self) -> None:
loop = self.loop.get()
) -> None:
self.cancel()
self.join()
+
+ if type is None:
+ try:
+ raise self.exc.get_nowait()
+ except queue.Empty:
+ pass