This is consistent with what the libpq does.
Move timeout calculation to a function in conninfo module and don't
change the connect_timeout parameter explicitly in the connection
string.
Drop awful drop_default_args_from_conninfo() from tests.
from ._compat import LiteralString
from .pq.misc import connection_summary
from .conninfo import make_conninfo, conninfo_to_dict, ConnectionInfo
-from .conninfo import conninfo_attempts, ConnDict
+from .conninfo import conninfo_attempts, ConnDict, timeout_from_conninfo
from ._pipeline import BasePipeline, Pipeline
from .generators import notifies, connect, execute
from ._encodings import pgconn_encoding
ConnStatus = pq.ConnStatus
TransactionStatus = pq.TransactionStatus
- # Default timeout for connection a attempt.
- # Arbitrary timeout, what applied by the libpq on my computer.
- # Your mileage won't vary.
- _DEFAULT_CONNECT_TIMEOUT = 130
-
def __init__(self, pgconn: "PGconn"):
self.pgconn = pgconn
self._autocommit = False
Connect to a database server and return a new `Connection` instance.
"""
params = cls._get_connection_params(conninfo, **kwargs)
- timeout = int(params["connect_timeout"])
+ timeout = timeout_from_conninfo(params)
rv = None
attempts = conninfo_attempts(params)
for attempt in attempts:
:return: Connection arguments merged and eventually modified, in a
format similar to `~conninfo.conninfo_to_dict()`.
"""
- params = conninfo_to_dict(conninfo, **kwargs)
-
- # Make sure there is an usable connect_timeout
- if "connect_timeout" in params:
- params["connect_timeout"] = int(params["connect_timeout"])
- else:
- # The sync connect function will stop on the default socket timeout
- # Because in async connection mode we need to enforce the timeout
- # ourselves, we need a finite value.
- params["connect_timeout"] = cls._DEFAULT_CONNECT_TIMEOUT
-
- return params
+ return conninfo_to_dict(conninfo, **kwargs)
def close(self) -> None:
"""Close the database connection."""
from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row
from .adapt import AdaptersMap
from ._enums import IsolationLevel
-from .conninfo import ConnDict, make_conninfo, conninfo_to_dict, conninfo_attempts_async
+from .conninfo import ConnDict, make_conninfo, conninfo_to_dict
+from .conninfo import conninfo_attempts_async, timeout_from_conninfo
from ._pipeline import AsyncPipeline
from ._encodings import pgconn_encoding
from .connection import BaseConnection, CursorRow, Notify
)
params = await cls._get_connection_params(conninfo, **kwargs)
- timeout = int(params["connect_timeout"])
+ timeout = timeout_from_conninfo(params)
rv = None
attempts = await conninfo_attempts_async(params)
for attempt in attempts:
@classmethod
async def _get_connection_params(cls, conninfo: str, **kwargs: Any) -> ConnDict:
"""Manipulate connection parameters before connecting."""
- params = conninfo_to_dict(conninfo, **kwargs)
-
- # Make sure there is an usable connect_timeout
- if "connect_timeout" in params:
- params["connect_timeout"] = int(params["connect_timeout"])
- else:
- # The sync connect function will stop on the default socket timeout
- # Because in async connection mode we need to enforce the timeout
- # ourselves, we need a finite value.
- params["connect_timeout"] = cls._DEFAULT_CONNECT_TIMEOUT
-
- return params
+ return conninfo_to_dict(conninfo, **kwargs)
async def close(self) -> None:
if self.closed:
ConnDict: TypeAlias = "dict[str, Any]"
+# Default timeout for connection a attempt.
+# Arbitrary timeout, what applied by the libpq on my computer.
+# Your mileage won't vary.
+_DEFAULT_CONNECT_TIMEOUT = 130
+
logger = logging.getLogger("psycopg")
return [{**params, "hostaddr": item[4][0]} for item in ans]
+def timeout_from_conninfo(params: ConnDict) -> int:
+ """
+ Return the timeout in seconds from the connection parameters.
+ """
+ # Follow the libpq convention:
+ #
+ # - 0 or less means no timeout (but we will use a default to simulate
+ # the socket timeout)
+ # - at least 2 seconds.
+ #
+ # See connectDBComplete in fe-connect.c
+ value = params.get("connect_timeout", _DEFAULT_CONNECT_TIMEOUT)
+ try:
+ timeout = int(value)
+ except ValueError:
+ raise e.ProgrammingError(f"bad value for connect_timeout: {value!r}")
+
+ if timeout <= 0:
+ # The sync connect function will stop on the default socket timeout
+ # Because in async connection mode we need to enforce the timeout
+ # ourselves, we need a finite value.
+ timeout = _DEFAULT_CONNECT_TIMEOUT
+ elif timeout < 2:
+ # Enforce a 2s min
+ timeout = 2
+
+ return timeout
+
+
def _get_param(params: ConnDict, name: str) -> str | None:
"""
Return a value from a connection string.
import psycopg
from psycopg import Notify, pq, errors as e
from psycopg.rows import tuple_row
-from psycopg.conninfo import conninfo_to_dict, make_conninfo
+from psycopg.conninfo import conninfo_to_dict
+from psycopg.conninfo import timeout_from_conninfo, _DEFAULT_CONNECT_TIMEOUT
from .test_cursor import my_row_factory
from .test_adapt import make_bin_dumper, make_dumper
-DEFAULT_TIMEOUT = psycopg.Connection._DEFAULT_CONNECT_TIMEOUT
-
def test_connect(conn_cls, dsn):
conn = conn_cls.connect(dsn)
def test_connect_timeout(conn_cls, deaf_port):
t0 = time.time()
with pytest.raises(psycopg.OperationalError, match="timeout expired"):
- conn_cls.connect(host="localhost", port=deaf_port, connect_timeout=1)
+ conn_cls.connect(host="localhost", port=deaf_port, connect_timeout=2)
elapsed = time.time() - t0
- assert elapsed == pytest.approx(1.0, abs=0.05)
+ assert elapsed == pytest.approx(2.0, abs=0.05)
@pytest.mark.slow
args["host"] = f"{proxy.client_host},{proxy.server_host}"
args["port"] = f"{deaf_port},{proxy.server_port}"
args.pop("hostaddr", None)
- monkeypatch.setattr(conn_cls, "_DEFAULT_CONNECT_TIMEOUT", 2)
+ monkeypatch.setattr(psycopg.conninfo, "_DEFAULT_CONNECT_TIMEOUT", 2)
t0 = time.time()
with conn_cls.connect(**args) as conn:
elapsed = time.time() - t0
args["host"] = f"{proxy.client_host},{proxy.server_host}"
args["port"] = f"{deaf_port},{proxy.server_port}"
args.pop("hostaddr", None)
- args["connect_timeout"] = "1"
+ args["connect_timeout"] = "2"
t0 = time.time()
with conn_cls.connect(**args) as conn:
elapsed = time.time() - t0
- assert 1.0 < elapsed < 1.5
+ assert 2.0 < elapsed < 2.5
assert conn.info.port == int(proxy.server_port)
assert conn.info.host == proxy.server_host
setpgenv({})
monkeypatch.setattr(psycopg.connection, "connect", fake_connect)
conn = conn_cls.connect(*args, **kwargs)
- got_params = drop_default_args_from_conninfo(got_conninfo)
- assert got_params == conninfo_to_dict(want)
+ assert conninfo_to_dict(got_conninfo) == conninfo_to_dict(want)
conn.close()
(
"",
{"dbname": "mydb", "connect_timeout": None},
- ({"dbname": "mydb"}, DEFAULT_TIMEOUT),
+ ({"dbname": "mydb"}, _DEFAULT_CONNECT_TIMEOUT),
),
(
"",
{"dbname": "mydb", "connect_timeout": 1},
- ({"dbname": "mydb", "connect_timeout": "1"}, 1),
+ ({"dbname": "mydb", "connect_timeout": 1}, 2),
),
(
"dbname=postgres",
{},
- ({"dbname": "postgres"}, DEFAULT_TIMEOUT),
+ ({"dbname": "postgres"}, _DEFAULT_CONNECT_TIMEOUT),
),
(
"dbname=postgres connect_timeout=2",
(
"postgresql:///postgres?connect_timeout=2",
{"connect_timeout": 10},
- ({"dbname": "postgres", "connect_timeout": "10"}, 10),
+ ({"dbname": "postgres", "connect_timeout": 10}, 10),
),
]
@pytest.mark.parametrize("dsn, kwargs, exp", conninfo_params_timeout)
def test_get_connection_params(conn_cls, dsn, kwargs, exp):
params = conn_cls._get_connection_params(dsn, **kwargs)
- conninfo = make_conninfo(**params)
- assert drop_default_args_from_conninfo(conninfo) == exp[0]
- assert params["connect_timeout"] == exp[1]
+ assert params == exp[0]
+ assert timeout_from_conninfo(params) == exp[1]
def test_connect_context(conn_cls, dsn):
def test_cancel_closed(conn):
conn.close()
conn.cancel()
-
-
-def drop_default_args_from_conninfo(conninfo):
- if isinstance(conninfo, str):
- params = conninfo_to_dict(conninfo)
- else:
- params = conninfo.copy()
-
- def removeif(key, value):
- if params.get(key) == value:
- params.pop(key)
-
- removeif("connect_timeout", str(DEFAULT_TIMEOUT))
-
- return params
import psycopg
from psycopg import Notify, errors as e
from psycopg.rows import tuple_row
-from psycopg.conninfo import conninfo_to_dict, make_conninfo
+from psycopg.conninfo import conninfo_to_dict, timeout_from_conninfo
from .test_cursor import my_row_factory
from .test_connection import tx_params, tx_params_isolation, tx_values_map
-from .test_connection import conninfo_params_timeout, drop_default_args_from_conninfo
+from .test_connection import conninfo_params_timeout
from .test_connection import testctx # noqa: F401 # fixture
from .test_adapt import make_bin_dumper, make_dumper
from .test_conninfo import fake_resolve # noqa: F401
async def test_connect_timeout(aconn_cls, deaf_port):
t0 = time.time()
with pytest.raises(psycopg.OperationalError, match="timeout expired"):
- await aconn_cls.connect(host="localhost", port=deaf_port, connect_timeout=1)
+ await aconn_cls.connect(host="localhost", port=deaf_port, connect_timeout=2)
elapsed = time.time() - t0
- assert elapsed == pytest.approx(1.0, abs=0.05)
+ assert elapsed == pytest.approx(2.0, abs=0.05)
@pytest.mark.slow
args["host"] = f"{proxy.client_host},{proxy.server_host}"
args["port"] = f"{deaf_port},{proxy.server_port}"
args.pop("hostaddr", None)
- monkeypatch.setattr(aconn_cls, "_DEFAULT_CONNECT_TIMEOUT", 2)
+ monkeypatch.setattr(psycopg.conninfo, "_DEFAULT_CONNECT_TIMEOUT", 2)
t0 = time.time()
async with await aconn_cls.connect(**args) as conn:
elapsed = time.time() - t0
args["host"] = f"{proxy.client_host},{proxy.server_host}"
args["port"] = f"{deaf_port},{proxy.server_port}"
args.pop("hostaddr", None)
- args["connect_timeout"] = "1"
+ args["connect_timeout"] = "2"
t0 = time.time()
async with await aconn_cls.connect(**args) as conn:
elapsed = time.time() - t0
- assert 1.0 < elapsed < 1.5
+ assert 2.0 < elapsed < 2.5
assert conn.info.port == int(proxy.server_port)
assert conn.info.host == proxy.server_host
setpgenv({})
monkeypatch.setattr(psycopg.connection, "connect", fake_connect)
conn = await aconn_cls.connect(*args, **kwargs)
- got_params = drop_default_args_from_conninfo(got_conninfo)
- assert got_params == conninfo_to_dict(want)
+ assert conninfo_to_dict(got_conninfo) == conninfo_to_dict(want)
await conn.close()
async def test_get_connection_params(aconn_cls, dsn, kwargs, exp, setpgenv):
setpgenv({})
params = await aconn_cls._get_connection_params(dsn, **kwargs)
- conninfo = make_conninfo(**params)
- assert drop_default_args_from_conninfo(conninfo) == exp[0]
- assert params["connect_timeout"] == exp[1]
+ assert params == exp[0]
+ assert timeout_from_conninfo(params) == exp[1]
async def test_connect_context_adapters(aconn_cls, dsn):
assert len(got) == 1
want = {"host": "foo.com", "hostaddr": "1.1.1.1"}
- assert drop_default_args_from_conninfo(got[0]) == want
+ assert conninfo_to_dict(got[0]) == want
from psycopg.conninfo import conninfo_to_dict
from .test_conninfo import fake_resolve # noqa: F401 # fixture
-from .test_connection import drop_default_args_from_conninfo
@pytest.mark.usefixtures("fake_resolve")
assert len(got) == 1
want = {"host": "foo.com", "hostaddr": "1.1.1.1"}
- assert drop_default_args_from_conninfo(got[0]) == want
+ assert conninfo_to_dict(got[0]) == want
@pytest.mark.dns
from psycopg._cmodule import _psycopg
from psycopg.conninfo import conninfo_to_dict
-from .test_connection import drop_default_args_from_conninfo
-
@pytest.mark.parametrize(
"args, kwargs, want",
orig_connect = psycopg.connection.connect # type: ignore
- got_conninfo = None
+ got_conninfo: str
def mock_connect(conninfo):
nonlocal got_conninfo
monkeypatch.setattr(psycopg.connection, "connect", mock_connect)
conn = psycopg.connect(*args, **kwargs)
- assert drop_default_args_from_conninfo(got_conninfo) == conninfo_to_dict(want)
+ assert conninfo_to_dict(got_conninfo) == conninfo_to_dict(want)
conn.close()
from . import dbapi20
from . import dbapi20_tpc
-from .test_connection import drop_default_args_from_conninfo
@pytest.fixture(scope="class")
setpgenv({})
monkeypatch.setattr(psycopg.connection, "connect", fake_connect)
conn = psycopg.connect(*args, **kwargs)
- assert drop_default_args_from_conninfo(got_conninfo) == conninfo_to_dict(want)
+ assert conninfo_to_dict(got_conninfo) == conninfo_to_dict(want)
conn.close()