import logging
import subprocess as sp
from shutil import which
+from contextlib import contextmanager
import pytest
def pytest_collection_modifyitems(items):
for item in items:
# TODO: there is a race condition on macOS and Windows in the CI:
- # listen returns before really listening and tests based on 'deaf_port'
+ # listen returns before really listening and tests based on 'deaf_listen'
# fail 50% of the times. Just add the 'proxy' mark on these tests
# because they are already skipped in the CI.
- if "proxy" in item.fixturenames or "deaf_port" in item.fixturenames:
+ if "proxy" in item.fixturenames:
item.add_marker(pytest.mark.proxy)
p.stop()
-@pytest.fixture
-def deaf_port(dsn):
- """Return a port number with a socket open but not answering"""
- with socket.socket(socket.AF_INET) as s:
- s.bind(("", 0))
- port = s.getsockname()[1]
- s.listen(0)
- yield port
-
-
class Proxy:
"""
Proxy a Postgres service for testing purpose.
logging.info("proxy stopped")
self.proc = None
+ @contextmanager
+ def deaf_listen(self):
+ """Open the proxy port to listen, but without accepting a connection.
+
+ A connection attempt on the proxy `client_host` and `client_port` will
+ block. Useful to test connection timeouts.
+ """
+ if self.proc:
+ raise Exception("the proxy is already listening")
+
+ with socket.socket(socket.AF_INET) as s:
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ s.bind((self.client_host, self.client_port))
+ s.listen(0)
+ yield s
+
@classmethod
def _get_random_port(cls):
with socket.socket() as s:
@pytest.mark.slow
@pytest.mark.skipif("ver(psycopg.__version__) < ver('3.0.8')")
-def test_no_queue_timeout(deaf_port):
- with pool.NullConnectionPool(kwargs={"host": "localhost", "port": deaf_port}) as p:
- with pytest.raises(pool.PoolTimeout):
+def test_no_queue_timeout(proxy):
+ with pool.NullConnectionPool(
+ kwargs={"host": proxy.client_host, "port": proxy.client_port}
+ ) as p:
+ with proxy.deaf_listen(), pytest.raises(pool.PoolTimeout):
with p.connection(timeout=1):
pass
@pytest.mark.slow
@pytest.mark.skipif("ver(psycopg.__version__) < ver('3.0.8')")
-async def test_no_queue_timeout(deaf_port):
+async def test_no_queue_timeout(proxy):
async with pool.AsyncNullConnectionPool(
- kwargs={"host": "localhost", "port": deaf_port}
+ kwargs={"host": proxy.client_host, "port": proxy.client_port}
) as p:
- with pytest.raises(pool.PoolTimeout):
+ with proxy.deaf_listen(), pytest.raises(pool.PoolTimeout):
async with p.connection(timeout=1):
pass
@pytest.mark.slow
@pytest.mark.timing
-def test_connect_timeout(conn_cls, deaf_port):
- t0 = time.time()
- with pytest.raises(psycopg.OperationalError, match="timeout expired"):
- conn_cls.connect(host="localhost", port=deaf_port, connect_timeout=2)
- elapsed = time.time() - t0
+def test_connect_timeout(conn_cls, proxy):
+ with proxy.deaf_listen():
+ t0 = time.time()
+ with pytest.raises(psycopg.OperationalError, match="timeout expired"):
+ conn_cls.connect(proxy.client_dsn, connect_timeout=2)
+ elapsed = time.time() - t0
assert elapsed == pytest.approx(2.0, abs=0.05)
@pytest.mark.slow
@pytest.mark.timing
-def test_multi_hosts(conn_cls, proxy, dsn, deaf_port, monkeypatch):
+def test_multi_hosts(conn_cls, proxy, dsn, monkeypatch):
args = conninfo_to_dict(dsn)
args["host"] = f"{proxy.client_host},{proxy.server_host}"
- args["port"] = f"{deaf_port},{proxy.server_port}"
+ args["port"] = f"{proxy.client_port},{proxy.server_port}"
args.pop("hostaddr", None)
monkeypatch.setattr(psycopg.conninfo, "_DEFAULT_CONNECT_TIMEOUT", 2)
- t0 = time.time()
- with conn_cls.connect(**args) as conn:
- elapsed = time.time() - t0
- assert 2.0 < elapsed < 2.5
- assert conn.info.port == int(proxy.server_port)
- assert conn.info.host == proxy.server_host
+ with proxy.deaf_listen():
+ t0 = time.time()
+ with conn_cls.connect(**args) as conn:
+ elapsed = time.time() - t0
+ assert 2.0 < elapsed < 2.5
+ assert conn.info.port == int(proxy.server_port)
+ assert conn.info.host == proxy.server_host
@pytest.mark.slow
@pytest.mark.timing
-def test_multi_hosts_timeout(conn_cls, proxy, dsn, deaf_port):
+def test_multi_hosts_timeout(conn_cls, proxy, dsn):
args = conninfo_to_dict(dsn)
args["host"] = f"{proxy.client_host},{proxy.server_host}"
- args["port"] = f"{deaf_port},{proxy.server_port}"
+ args["port"] = f"{proxy.client_port},{proxy.server_port}"
args.pop("hostaddr", None)
args["connect_timeout"] = "2"
- t0 = time.time()
- with conn_cls.connect(**args) as conn:
- elapsed = time.time() - t0
- assert 2.0 < elapsed < 2.5
- assert conn.info.port == int(proxy.server_port)
- assert conn.info.host == proxy.server_host
+ with proxy.deaf_listen():
+ t0 = time.time()
+ with conn_cls.connect(**args) as conn:
+ elapsed = time.time() - t0
+ assert 2.0 < elapsed < 2.5
+ assert conn.info.port == int(proxy.server_port)
+ assert conn.info.host == proxy.server_host
def test_close(conn):
@pytest.mark.slow
@pytest.mark.timing
-async def test_connect_timeout(aconn_cls, deaf_port):
- t0 = time.time()
- with pytest.raises(psycopg.OperationalError, match="timeout expired"):
- await aconn_cls.connect(host="localhost", port=deaf_port, connect_timeout=2)
- elapsed = time.time() - t0
+async def test_connect_timeout(aconn_cls, proxy):
+ with proxy.deaf_listen():
+ t0 = time.time()
+ with pytest.raises(psycopg.OperationalError, match="timeout expired"):
+ await aconn_cls.connect(proxy.client_dsn, connect_timeout=2)
+ elapsed = time.time() - t0
assert elapsed == pytest.approx(2.0, abs=0.05)
@pytest.mark.slow
@pytest.mark.timing
-async def test_multi_hosts(aconn_cls, proxy, dsn, deaf_port, monkeypatch):
+async def test_multi_hosts(aconn_cls, proxy, dsn, monkeypatch):
args = conninfo_to_dict(dsn)
args["host"] = f"{proxy.client_host},{proxy.server_host}"
- args["port"] = f"{deaf_port},{proxy.server_port}"
+ args["port"] = f"{proxy.client_port},{proxy.server_port}"
args.pop("hostaddr", None)
monkeypatch.setattr(psycopg.conninfo, "_DEFAULT_CONNECT_TIMEOUT", 2)
- t0 = time.time()
- async with await aconn_cls.connect(**args) as conn:
- elapsed = time.time() - t0
- assert 2.0 < elapsed < 2.5
- assert conn.info.port == int(proxy.server_port)
- assert conn.info.host == proxy.server_host
+ with proxy.deaf_listen():
+ t0 = time.time()
+ async with await aconn_cls.connect(**args) as conn:
+ elapsed = time.time() - t0
+ assert 2.0 < elapsed < 2.5
+ assert conn.info.port == int(proxy.server_port)
+ assert conn.info.host == proxy.server_host
@pytest.mark.slow
@pytest.mark.timing
-async def test_multi_hosts_timeout(aconn_cls, proxy, dsn, deaf_port):
+async def test_multi_hosts_timeout(aconn_cls, proxy, dsn):
args = conninfo_to_dict(dsn)
args["host"] = f"{proxy.client_host},{proxy.server_host}"
- args["port"] = f"{deaf_port},{proxy.server_port}"
+ args["port"] = f"{proxy.client_port},{proxy.server_port}"
args.pop("hostaddr", None)
args["connect_timeout"] = "2"
- t0 = time.time()
- async with await aconn_cls.connect(**args) as conn:
- elapsed = time.time() - t0
- assert 2.0 < elapsed < 2.5
- assert conn.info.port == int(proxy.server_port)
- assert conn.info.host == proxy.server_host
+ with proxy.deaf_listen():
+ t0 = time.time()
+ async with await aconn_cls.connect(**args) as conn:
+ elapsed = time.time() - t0
+ assert 2.0 < elapsed < 2.5
+ assert conn.info.port == int(proxy.server_port)
+ assert conn.info.host == proxy.server_host
async def test_close(aconn):