import dis
import os.path
import re
+import signal
import subprocess
import sys
import sysconfig
)
+USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
+
+def create_process_group(*args, **kwargs):
+ if USE_PROCESS_GROUP:
+ kwargs['start_new_session'] = True
+ return subprocess.Popen(*args, **kwargs)
+
+def kill_process_group(proc):
+ if USE_PROCESS_GROUP:
+ try:
+ os.killpg(proc.pid, signal.SIGKILL)
+ except ProcessLookupError:
+ pass
+ else:
+ proc.kill()
+ proc.communicate() # Clean up
+
+
class TraceBackend:
EXTENSION = None
COMMAND = None
program = self.PROGRAMS[name].format(python=sys.executable)
try:
- proc = subprocess.Popen(
+ proc = create_process_group(
["bpftrace", "-e", program, "-c", " ".join(subcommand)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate(timeout=60)
except subprocess.TimeoutExpired:
- proc.kill()
+ kill_process_group(proc)
raise AssertionError("bpftrace timed out")
except (FileNotFoundError, PermissionError) as e:
raise unittest.SkipTest(f"bpftrace not available: {e}")
# Check if bpftrace is available and can attach to USDT probes
program = f'usdt:{sys.executable}:python:function__entry {{ printf("probe: success\\n"); exit(); }}'
try:
- proc = subprocess.Popen(
+ proc = create_process_group(
["bpftrace", "-e", program, "-c", f"{sys.executable} -c pass"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
- proc.kill()
- proc.communicate() # Clean up
+ kill_process_group(proc)
raise unittest.SkipTest("bpftrace timed out during usability check")
except OSError as e:
raise unittest.SkipTest(f"bpftrace not available: {e}")