"copy": 41608,
"cursor with hold": 77101,
"deferrable": 48307,
+ "do": 17511,
"encoding": 35882,
"hstore": 41284,
"infinity date": 41564,
"nested array": 32552,
"notify": 41522,
"password_encryption": 42519,
+ "pg_terminate_backend": 35897,
"range": 41282,
"scroll cursor": 77102,
"stored procedure": 1751,
-import sys
import time
import pytest
import logging
import weakref
from typing import Any, List
-
-if sys.version_info >= (3, 8):
- from typing import TypedDict
-else:
- from typing_extensions import TypedDict
+from dataclasses import dataclass
import psycopg
from psycopg import Connection, Notify, errors as e
cur.execute("select 1")
+@pytest.mark.crdb("skip", reason="pg_terminate_backend")
def test_broken(conn):
with pytest.raises(psycopg.OperationalError):
conn.execute("select pg_terminate_backend(%s)", [conn.pgconn.backend_pid])
conn.close()
+@pytest.mark.crdb("skip", reason="pg_terminate_backend")
def test_context_inerror_rollback_no_clobber(conn, dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
assert "in rollback" in rec.message
+@pytest.mark.crdb("skip", reason="copy")
def test_context_active_rollback_no_clobber(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
with pytest.raises(ZeroDivisionError):
with psycopg.connect(dsn) as conn:
conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout")
+ assert not conn.pgconn.error_message
status = conn.info.transaction_status
assert status == conn.TransactionStatus.ACTIVE
1 / 0
conn.commit()
+@pytest.mark.crdb("skip", reason="deferrable")
def test_commit_error(conn):
conn.execute(
"""
psycopg.Connection.connect(*args, **kwargs)
+@pytest.mark.crdb("skip", reason="pg_terminate_backend")
def test_broken_connection(conn):
cur = conn.cursor()
with pytest.raises(psycopg.DatabaseError):
assert conn.closed
+@pytest.mark.crdb("skip", reason="do")
def test_notice_handlers(conn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
messages = []
conn.remove_notice_handler(cb1)
+@pytest.mark.crdb("skip", reason="notify")
def test_notify_handlers(conn):
nots1 = []
nots2 = []
assert isinstance(cur, MyServerCursor)
-class ParamDef(TypedDict):
+@dataclass
+class ParamDef:
+ name: str
guc: str
values: List[Any]
- set_method: str
-tx_params = {
- "isolation_level": ParamDef(
- {
- "guc": "isolation",
- "values": list(psycopg.IsolationLevel),
- "set_method": "set_isolation_level",
- }
- ),
- "read_only": ParamDef(
- {
- "guc": "read_only",
- "values": [True, False],
- "set_method": "set_read_only",
- }
- ),
- "deferrable": ParamDef(
- {
- "guc": "deferrable",
- "values": [True, False],
- "set_method": "set_deferrable",
- }
- ),
-}
+param_isolation = ParamDef(
+ name="isolation_level",
+ guc="isolation",
+ values=list(psycopg.IsolationLevel),
+)
+param_read_only = ParamDef(
+ name="read_only",
+ guc="read_only",
+ values=[True, False],
+)
+param_deferrable = ParamDef(
+ name="deferrable",
+ guc="deferrable",
+ values=[True, False],
+)
# Map Python values to Postgres values for the tx_params possible values
tx_values_map = {
tx_values_map["off"] = False
-@pytest.mark.parametrize("attr", list(tx_params))
-def test_transaction_param_default(conn, attr):
- assert getattr(conn, attr) is None
- guc = tx_params[attr]["guc"]
+tx_params = [
+ param_isolation,
+ param_read_only,
+ pytest.param(param_deferrable, marks=pytest.mark.crdb("skip", reason="deferrable")),
+]
+tx_params_isolation = [
+ pytest.param(
+ param_isolation,
+ marks=pytest.mark.crdb("skip", reason="transaction isolation"),
+ ),
+ param_read_only,
+ pytest.param(param_deferrable, marks=pytest.mark.crdb("skip", reason="deferrable")),
+]
+
+
+@pytest.mark.parametrize("param", tx_params)
+def test_transaction_param_default(conn, param):
+ assert getattr(conn, param.name) is None
current, default = conn.execute(
"select current_setting(%s), current_setting(%s)",
- [f"transaction_{guc}", f"default_transaction_{guc}"],
+ [f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
).fetchone()
assert current == default
@pytest.mark.parametrize("autocommit", [True, False])
-@pytest.mark.parametrize("attr", list(tx_params))
-def test_set_transaction_param_implicit(conn, attr, autocommit):
- guc = tx_params[attr]["guc"]
+@pytest.mark.parametrize("param", tx_params_isolation)
+def test_set_transaction_param_implicit(conn, param, autocommit):
conn.autocommit = autocommit
- for value in tx_params[attr]["values"]:
- setattr(conn, attr, value)
+ for value in param.values:
+ setattr(conn, param.name, value)
pgval, default = conn.execute(
"select current_setting(%s), current_setting(%s)",
- [f"transaction_{guc}", f"default_transaction_{guc}"],
+ [f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
).fetchone()
if autocommit:
assert pgval == default
@pytest.mark.parametrize("autocommit", [True, False])
-@pytest.mark.parametrize("attr", list(tx_params))
-def test_set_transaction_param_block(conn, attr, autocommit):
- guc = tx_params[attr]["guc"]
+@pytest.mark.parametrize("param", tx_params_isolation)
+def test_set_transaction_param_block(conn, param, autocommit):
conn.autocommit = autocommit
- for value in tx_params[attr]["values"]:
- setattr(conn, attr, value)
+ for value in param.values:
+ setattr(conn, param.name, value)
with conn.transaction():
pgval = conn.execute(
- "select current_setting(%s)", [f"transaction_{guc}"]
+ "select current_setting(%s)", [f"transaction_{param.guc}"]
).fetchone()[0]
assert tx_values_map[pgval] == value
-@pytest.mark.parametrize("attr", list(tx_params))
-def test_set_transaction_param_not_intrans_implicit(conn, attr):
+@pytest.mark.parametrize("param", tx_params)
+def test_set_transaction_param_not_intrans_implicit(conn, param):
conn.execute("select 1")
with pytest.raises(psycopg.ProgrammingError):
- setattr(conn, attr, tx_params[attr]["values"][0])
+ setattr(conn, param.name, param.values[0])
-@pytest.mark.parametrize("attr", list(tx_params))
-def test_set_transaction_param_not_intrans_block(conn, attr):
+@pytest.mark.parametrize("param", tx_params)
+def test_set_transaction_param_not_intrans_block(conn, param):
with conn.transaction():
with pytest.raises(psycopg.ProgrammingError):
- setattr(conn, attr, tx_params[attr]["values"][0])
+ setattr(conn, param.name, param.values[0])
-@pytest.mark.parametrize("attr", list(tx_params))
-def test_set_transaction_param_not_intrans_external(conn, attr):
+@pytest.mark.parametrize("param", tx_params)
+def test_set_transaction_param_not_intrans_external(conn, param):
conn.autocommit = True
conn.execute("begin")
with pytest.raises(psycopg.ProgrammingError):
- setattr(conn, attr, tx_params[attr]["values"][0])
+ setattr(conn, param.name, param.values[0])
+@pytest.mark.crdb("skip", reason="transaction isolation")
def test_set_transaction_param_all(conn):
- for attr in tx_params:
- value = tx_params[attr]["values"][0]
- setattr(conn, attr, value)
+ params: List[Any] = tx_params[:]
+ params[2] = params[2].values[0]
+
+ for param in params:
+ value = param.values[0]
+ setattr(conn, param.name, value)
- for attr in tx_params:
- guc = tx_params[attr]["guc"]
+ for param in params:
pgval = conn.execute(
- "select current_setting(%s)", [f"transaction_{guc}"]
+ "select current_setting(%s)", [f"transaction_{param.guc}"]
).fetchone()[0]
assert tx_values_map[pgval] == value
import pytest
import logging
import weakref
+from typing import List, Any
import psycopg
from psycopg import AsyncConnection, Notify, errors as e
from .utils import gc_collect
from .test_cursor import my_row_factory
-from .test_connection import tx_params, tx_values_map, conninfo_params_timeout
+from .test_connection import tx_params, tx_params_isolation, tx_values_map
+from .test_connection import conninfo_params_timeout
from .test_adapt import make_bin_dumper, make_dumper
from .test_conninfo import fake_resolve # noqa: F401
await cur.execute("select 1")
+@pytest.mark.crdb("skip", reason="pg_terminate_backend")
async def test_broken(aconn):
with pytest.raises(psycopg.OperationalError):
await aconn.execute(
await aconn.close()
+@pytest.mark.crdb("skip", reason="pg_terminate_backend")
async def test_context_inerror_rollback_no_clobber(conn, dsn, caplog):
with pytest.raises(ZeroDivisionError):
async with await psycopg.AsyncConnection.connect(dsn) as conn2:
assert "in rollback" in rec.message
+@pytest.mark.crdb("skip", reason="copy")
async def test_context_active_rollback_no_clobber(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
with pytest.raises(ZeroDivisionError):
async with await psycopg.AsyncConnection.connect(dsn) as conn:
conn.pgconn.exec_(b"copy (select generate_series(1, 10)) to stdout")
+ assert not conn.pgconn.error_message
status = conn.info.transaction_status
assert status == conn.TransactionStatus.ACTIVE
1 / 0
await aconn.commit()
+@pytest.mark.crdb("skip", reason="deferrable")
async def test_commit_error(aconn):
await aconn.execute(
"""
await psycopg.AsyncConnection.connect(*args, **kwargs)
+@pytest.mark.crdb("skip", reason="pg_terminate_backend")
async def test_broken_connection(aconn):
cur = aconn.cursor()
with pytest.raises(psycopg.DatabaseError):
assert aconn.closed
+@pytest.mark.crdb("skip", reason="do")
async def test_notice_handlers(aconn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg")
messages = []
aconn.remove_notice_handler(cb1)
+@pytest.mark.crdb("skip", reason="notify")
async def test_notify_handlers(aconn):
nots1 = []
nots2 = []
assert isinstance(cur, MyServerCursor)
-@pytest.mark.parametrize("attr", list(tx_params))
-async def test_transaction_param_default(aconn, attr):
- assert getattr(aconn, attr) is None
- guc = tx_params[attr]["guc"]
+@pytest.mark.parametrize("param", tx_params)
+async def test_transaction_param_default(aconn, param):
+ assert getattr(aconn, param.name) is None
cur = await aconn.execute(
"select current_setting(%s), current_setting(%s)",
- [f"transaction_{guc}", f"default_transaction_{guc}"],
+ [f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
)
current, default = await cur.fetchone()
assert current == default
-@pytest.mark.parametrize("attr", list(tx_params))
-async def test_transaction_param_readonly_property(aconn, attr):
+@pytest.mark.parametrize("param", tx_params)
+async def test_transaction_param_readonly_property(aconn, param):
with pytest.raises(AttributeError):
- setattr(aconn, attr, None)
+ setattr(aconn, param.name, None)
@pytest.mark.parametrize("autocommit", [True, False])
-@pytest.mark.parametrize("attr", list(tx_params))
-async def test_set_transaction_param_implicit(aconn, attr, autocommit):
- guc = tx_params[attr]["guc"]
+@pytest.mark.parametrize("param", tx_params_isolation)
+async def test_set_transaction_param_implicit(aconn, param, autocommit):
await aconn.set_autocommit(autocommit)
- for value in tx_params[attr]["values"]:
- await getattr(aconn, tx_params[attr]["set_method"])(value)
+ for value in param.values:
+ await getattr(aconn, f"set_{param.name}")(value)
cur = await aconn.execute(
"select current_setting(%s), current_setting(%s)",
- [f"transaction_{guc}", f"default_transaction_{guc}"],
+ [f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
)
pgval, default = await cur.fetchone()
if autocommit:
@pytest.mark.parametrize("autocommit", [True, False])
-@pytest.mark.parametrize("attr", list(tx_params))
-async def test_set_transaction_param_block(aconn, attr, autocommit):
- guc = tx_params[attr]["guc"]
+@pytest.mark.parametrize("param", tx_params_isolation)
+async def test_set_transaction_param_block(aconn, param, autocommit):
await aconn.set_autocommit(autocommit)
- for value in tx_params[attr]["values"]:
- await getattr(aconn, tx_params[attr]["set_method"])(value)
+ for value in param.values:
+ await getattr(aconn, f"set_{param.name}")(value)
async with aconn.transaction():
cur = await aconn.execute(
- "select current_setting(%s)", [f"transaction_{guc}"]
+ "select current_setting(%s)", [f"transaction_{param.guc}"]
)
pgval = (await cur.fetchone())[0]
assert tx_values_map[pgval] == value
-@pytest.mark.parametrize("attr", list(tx_params))
-async def test_set_transaction_param_not_intrans_implicit(aconn, attr):
+@pytest.mark.parametrize("param", tx_params)
+async def test_set_transaction_param_not_intrans_implicit(aconn, param):
await aconn.execute("select 1")
- value = tx_params[attr]["values"][0]
+ value = param.values[0]
with pytest.raises(psycopg.ProgrammingError):
- await getattr(aconn, tx_params[attr]["set_method"])(value)
+ await getattr(aconn, f"set_{param.name}")(value)
-@pytest.mark.parametrize("attr", list(tx_params))
-async def test_set_transaction_param_not_intrans_block(aconn, attr):
- value = tx_params[attr]["values"][0]
+@pytest.mark.parametrize("param", tx_params)
+async def test_set_transaction_param_not_intrans_block(aconn, param):
+ value = param.values[0]
async with aconn.transaction():
with pytest.raises(psycopg.ProgrammingError):
- await getattr(aconn, tx_params[attr]["set_method"])(value)
+ await getattr(aconn, f"set_{param.name}")(value)
-@pytest.mark.parametrize("attr", list(tx_params))
-async def test_set_transaction_param_not_intrans_external(aconn, attr):
- value = tx_params[attr]["values"][0]
+@pytest.mark.parametrize("param", tx_params)
+async def test_set_transaction_param_not_intrans_external(aconn, param):
+ value = param.values[0]
await aconn.set_autocommit(True)
await aconn.execute("begin")
with pytest.raises(psycopg.ProgrammingError):
- await getattr(aconn, tx_params[attr]["set_method"])(value)
+ await getattr(aconn, f"set_{param.name}")(value)
+@pytest.mark.crdb("skip", reason="transaction isolation")
async def test_set_transaction_param_all(aconn):
- for attr in tx_params:
- value = tx_params[attr]["values"][0]
- await getattr(aconn, tx_params[attr]["set_method"])(value)
+ params: List[Any] = tx_params[:]
+ params[2] = params[2].values[0]
- for attr in tx_params:
- guc = tx_params[attr]["guc"]
- cur = await aconn.execute("select current_setting(%s)", [f"transaction_{guc}"])
+ for param in params:
+ value = param.values[0]
+ await getattr(aconn, f"set_{param.name}")(value)
+
+ for param in params:
+ cur = await aconn.execute(
+ "select current_setting(%s)", [f"transaction_{param.guc}"]
+ )
pgval = (await cur.fetchone())[0]
assert tx_values_map[pgval] == value