import asyncio
import os
import pkgutil
+import signal
+import sys
import time
from asyncio import create_subprocess_exec, create_subprocess_shell
from pathlib import PurePath
from knot_resolver_manager.compat.asyncio import to_thread
+def unblock_signals():
+ if sys.version_info.major >= 3 and sys.version_info.minor >= 8:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) # type: ignore
+ else:
+ # the list of signals is not exhaustive, but it should cover all signals we might ever want to block
+ signal.pthread_sigmask(
+ signal.SIG_UNBLOCK,
+ {
+ signal.SIGHUP,
+ signal.SIGINT,
+ signal.SIGTERM,
+ signal.SIGUSR1,
+ signal.SIGUSR2,
+ },
+ )
+
+
async def call(
cmd: Union[str, bytes, List[str], List[bytes]], shell: bool = False, discard_output: bool = False
) -> int:
"""
custom async alternative to subprocess.call()
"""
- kwargs: Dict[str, Any] = {}
+ kwargs: Dict[str, Any] = {
+ "preexec_fn": unblock_signals,
+ }
if discard_output:
kwargs["stdout"] = asyncio.subprocess.DEVNULL
kwargs["stderr"] = asyncio.subprocess.DEVNULL