--- /dev/null
+"""
+Support module for test_connection[_async].py
+"""
+
+from typing import Any, List
+from dataclasses import dataclass
+
+import pytest
+import psycopg
+
+
+@pytest.fixture
+def testctx(svcconn):
+ svcconn.execute("create table if not exists testctx (id int primary key)")
+ svcconn.execute("delete from testctx")
+ return None
+
+
+@dataclass
+class ParamDef:
+ name: str
+ guc: str
+ values: List[Any]
+ non_default: str
+
+
+param_isolation = ParamDef(
+ name="isolation_level",
+ guc="isolation",
+ values=list(psycopg.IsolationLevel),
+ non_default="serializable",
+)
+param_read_only = ParamDef(
+ name="read_only",
+ guc="read_only",
+ values=[True, False],
+ non_default="on",
+)
+param_deferrable = ParamDef(
+ name="deferrable",
+ guc="deferrable",
+ values=[True, False],
+ non_default="on",
+)
+
+# Map Python values to Postgres values for the tx_params possible values
+tx_values_map = {
+ v.name.lower().replace("_", " "): v.value for v in psycopg.IsolationLevel
+}
+tx_values_map["on"] = True
+tx_values_map["off"] = False
+
+
+tx_params = [
+ param_isolation,
+ param_read_only,
+ pytest.param(param_deferrable, marks=pytest.mark.crdb_skip("deferrable")),
+]
+tx_params_isolation = [
+ pytest.param(
+ param_isolation,
+ id="isolation_level",
+ marks=pytest.mark.crdb("skip", reason="transaction isolation"),
+ ),
+ pytest.param(
+ param_read_only, id="read_only", marks=pytest.mark.crdb_skip("begin_read_only")
+ ),
+ pytest.param(
+ param_deferrable, id="deferrable", marks=pytest.mark.crdb_skip("deferrable")
+ ),
+]
+
+
+conninfo_params_timeout = [
+ (
+ "",
+ {"dbname": "mydb", "connect_timeout": None},
+ ({"dbname": "mydb"}, None),
+ ),
+ (
+ "",
+ {"dbname": "mydb", "connect_timeout": 1},
+ ({"dbname": "mydb", "connect_timeout": "1"}, 1),
+ ),
+ (
+ "dbname=postgres",
+ {},
+ ({"dbname": "postgres"}, None),
+ ),
+ (
+ "dbname=postgres connect_timeout=2",
+ {},
+ ({"dbname": "postgres", "connect_timeout": "2"}, 2),
+ ),
+ (
+ "postgresql:///postgres?connect_timeout=2",
+ {"connect_timeout": 10},
+ ({"dbname": "postgres", "connect_timeout": "10"}, 10),
+ ),
+]
import logging
import weakref
from typing import Any, List
-from dataclasses import dataclass
import psycopg
from psycopg import Notify, pq, errors as e
from .utils import gc_collect
from ._test_cursor import my_row_factory
+from ._test_connection import tx_params, tx_params_isolation, tx_values_map
+from ._test_connection import conninfo_params_timeout
+from ._test_connection import testctx # noqa: F401 # fixture
from .test_adapt import make_bin_dumper, make_dumper
conn.close()
+def test_connect_bad(conn_cls):
+ with pytest.raises(psycopg.OperationalError):
+ conn_cls.connect("dbname=nosuchdb")
+
+
def test_connect_str_subclass(conn_cls, dsn):
class MyString(str):
pass
conn.close()
-def test_connect_bad(conn_cls):
- with pytest.raises(psycopg.OperationalError):
- conn_cls.connect("dbname=nosuchdb")
-
-
@pytest.mark.slow
@pytest.mark.timing
def test_connect_timeout(conn_cls, deaf_port):
with pytest.raises(psycopg.OperationalError):
with conn.cursor("foo"):
pass
+ with pytest.raises(psycopg.OperationalError):
+ conn.cursor("foo")
with pytest.raises(psycopg.OperationalError):
conn.cursor()
assert not recwarn, [str(w.message) for w in recwarn.list]
-@pytest.fixture
-def testctx(svcconn):
- svcconn.execute("create table if not exists testctx (id int primary key)")
- svcconn.execute("delete from testctx")
- return None
-
-
-def test_context_commit(conn_cls, testctx, conn, dsn):
+@pytest.mark.usefixtures("testctx")
+def test_context_commit(conn_cls, conn, dsn):
with conn:
with conn.cursor() as cur:
cur.execute("insert into testctx values (42)")
assert cur.fetchall() == [(42,)]
-def test_context_rollback(conn_cls, testctx, conn, dsn):
+@pytest.mark.usefixtures("testctx")
+def test_context_rollback(conn_cls, conn, dsn):
with pytest.raises(ZeroDivisionError):
with conn:
with conn.cursor() as cur:
assert isinstance(cur, MyServerCursor)
-@dataclass
-class ParamDef:
- name: str
- guc: str
- values: List[Any]
- non_default: str
-
-
-param_isolation = ParamDef(
- name="isolation_level",
- guc="isolation",
- values=list(psycopg.IsolationLevel),
- non_default="serializable",
-)
-param_read_only = ParamDef(
- name="read_only",
- guc="read_only",
- values=[True, False],
- non_default="on",
-)
-param_deferrable = ParamDef(
- name="deferrable",
- guc="deferrable",
- values=[True, False],
- non_default="on",
-)
-
-# Map Python values to Postgres values for the tx_params possible values
-tx_values_map = {
- v.name.lower().replace("_", " "): v.value for v in psycopg.IsolationLevel
-}
-tx_values_map["on"] = True
-tx_values_map["off"] = False
-
-
-tx_params = [
- param_isolation,
- param_read_only,
- pytest.param(param_deferrable, marks=pytest.mark.crdb_skip("deferrable")),
-]
-tx_params_isolation = [
- pytest.param(
- param_isolation,
- id="isolation_level",
- marks=pytest.mark.crdb("skip", reason="transaction isolation"),
- ),
- pytest.param(
- param_read_only, id="read_only", marks=pytest.mark.crdb_skip("begin_read_only")
- ),
- pytest.param(
- param_deferrable, id="deferrable", marks=pytest.mark.crdb_skip("deferrable")
- ),
-]
-
-
@pytest.mark.parametrize("param", tx_params)
def test_transaction_param_default(conn, param):
assert getattr(conn, param.name) is None
assert conn.deferrable is False
-conninfo_params_timeout = [
- (
- "",
- {"dbname": "mydb", "connect_timeout": None},
- ({"dbname": "mydb"}, None),
- ),
- (
- "",
- {"dbname": "mydb", "connect_timeout": 1},
- ({"dbname": "mydb", "connect_timeout": "1"}, 1),
- ),
- (
- "dbname=postgres",
- {},
- ({"dbname": "postgres"}, None),
- ),
- (
- "dbname=postgres connect_timeout=2",
- {},
- ({"dbname": "postgres", "connect_timeout": "2"}, 2),
- ),
- (
- "postgresql:///postgres?connect_timeout=2",
- {"connect_timeout": 10},
- ({"dbname": "postgres", "connect_timeout": "10"}, 10),
- ),
-]
-
-
@pytest.mark.parametrize("dsn, kwargs, exp", conninfo_params_timeout)
def test_get_connection_params(conn_cls, dsn, kwargs, exp):
params = conn_cls._get_connection_params(dsn, **kwargs)
from .utils import gc_collect
from ._test_cursor import my_row_factory
-from .test_connection import tx_params, tx_params_isolation, tx_values_map
-from .test_connection import conninfo_params_timeout
-from .test_connection import testctx # noqa: F401 # fixture
+from ._test_connection import tx_params, tx_params_isolation, tx_values_map
+from ._test_connection import conninfo_params_timeout
+from ._test_connection import testctx # noqa: F401 # fixture
from .test_adapt import make_bin_dumper, make_dumper
-from .test_conninfo import fake_resolve # noqa: F401
-
-pytestmark = pytest.mark.anyio
+from .test_conninfo import fake_resolve # noqa: F401 # fixture
async def test_connect(aconn_cls, dsn):
with pytest.raises(psycopg.OperationalError):
async with aconn.cursor("foo"):
pass
+ with pytest.raises(psycopg.OperationalError):
aconn.cursor("foo")
with pytest.raises(psycopg.OperationalError):
aconn.cursor()
conn = await aconn_cls.connect(dsn)
try:
await conn.execute("select wat")
- except Exception:
+ except psycopg.ProgrammingError:
pass
del conn
+ gc_collect()
assert "INERROR" in str(recwarn.pop(ResourceWarning).message)
async with await aconn_cls.connect(dsn) as conn:
@pytest.mark.crdb_skip("pg_terminate_backend")
async def test_context_inerror_rollback_no_clobber(aconn_cls, conn, dsn, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
with pytest.raises(ZeroDivisionError):
async with await aconn_cls.connect(dsn) as conn2:
await conn2.execute("select 1")
await cur.execute("meh")
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
+ with pytest.raises(psycopg.errors.InFailedSqlTransaction):
+ await cur.execute("select 1")
+
await aconn.commit()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.IDLE
await cur.execute("select * from foo")
aconn.cancel()
-async def test_resolve_hostaddr_conn(monkeypatch, fake_resolve): # noqa: F811
+@pytest.mark.usefixtures("fake_resolve")
+async def test_resolve_hostaddr_conn(aconn_cls, monkeypatch):
got = []
def fake_connect_gen(conninfo, **kwargs):
got.append(conninfo)
1 / 0
- monkeypatch.setattr(psycopg.AsyncConnection, "_connect_gen", fake_connect_gen)
+ monkeypatch.setattr(aconn_cls, "_connect_gen", fake_connect_gen)
with pytest.raises(ZeroDivisionError):
- await psycopg.AsyncConnection.connect("host=foo.com")
+ await aconn_cls.connect("host=foo.com")
assert len(got) == 1
want = {"host": "foo.com", "hostaddr": "1.1.1.1"}