import os
+import sys
import time
import socket
import logging
import subprocess as sp
-from shutil import which
from contextlib import contextmanager
import pytest
return
logging.info("starting proxy")
-
- if not (pproxy := which("pproxy")):
- raise ValueError("pproxy program not found")
- cmdline = [pproxy, "--reuse"]
- cmdline.extend(["-l", f"tunnel://:{self.client_port}"])
- cmdline.extend(["-r", f"tunnel://{self.server_host}:{self.server_port}"])
+ cmdline = [sys.executable, "-m", "tests.pproxy_fix", "--reuse"]
+ cmdline += ["-l", f"tunnel://:{self.client_port}"]
+ cmdline += ["-r", f"tunnel://{self.server_host}:{self.server_port}"]
self.proc = sp.Popen(cmdline, stdout=sp.DEVNULL)
logging.info("proxy started")
--- /dev/null
+#!/usr/bin/env python
+"""Wrapper for pproxy to fix Python 3.14 compatibility
+
+Work around https://github.com/qwj/python-proxy/issues/200
+"""
+
+import sys
+import asyncio
+from typing import Any
+
+from pproxy.server import main as main_ # type: ignore[import-untyped]
+
+
+def main() -> Any:
+ # Before Python 3.14 `get_event_loop()` used to create a new loop.
+ # From Python 3.14 it raises a `RuntimeError`.
+ try:
+ asyncio.get_event_loop()
+ except RuntimeError:
+ asyncio.set_event_loop(asyncio.new_event_loop())
+
+ return main_()
+
+
+if __name__ == "__main__":
+ sys.exit(main())