assert type(ex.value) is want_ex
-# NOTE: these "tsa" tests assume that the database we use for tests is a primary.
-
-
+@pytest.mark.libpq(">= 14")
@pytest.mark.parametrize("mode", ["any", "read-write", "primary", "prefer-standby"])
def test_connect_tsa(conn_cls, dsn, mode):
+ # NOTE: assume that the test database is a "primary"
params = conninfo_to_dict(dsn, target_session_attrs=mode)
with conn_cls.connect(**params) as conn:
assert conn.pgconn.status == pq.ConnStatus.OK
+@pytest.mark.libpq(">= 14")
@pytest.mark.parametrize("mode", ["read-only", "standby", "nosuchmode"])
def test_connect_tsa_bad(conn_cls, dsn, mode):
+ # NOTE: assume that the test database is a "primary"
params = conninfo_to_dict(dsn, target_session_attrs=mode)
with pytest.raises(psycopg.OperationalError, match=mode):
conn_cls.connect(**params)
assert type(ex.value) is want_ex
-# NOTE: these "tsa" tests assume that the database we use for tests is a primary.
-
-
+@pytest.mark.libpq(">= 14")
@pytest.mark.parametrize("mode", ["any", "read-write", "primary", "prefer-standby"])
async def test_connect_tsa(aconn_cls, dsn, mode):
+ # NOTE: assume that the test database is a "primary"
params = conninfo_to_dict(dsn, target_session_attrs=mode)
async with await aconn_cls.connect(**params) as aconn:
assert aconn.pgconn.status == pq.ConnStatus.OK
+@pytest.mark.libpq(">= 14")
@pytest.mark.parametrize("mode", ["read-only", "standby", "nosuchmode"])
async def test_connect_tsa_bad(aconn_cls, dsn, mode):
+ # NOTE: assume that the test database is a "primary"
params = conninfo_to_dict(dsn, target_session_attrs=mode)
with pytest.raises(psycopg.OperationalError, match=mode):
await aconn_cls.connect(**params)