# Copyright (C) 2020-2021 The Psycopg Team
-from enum import Enum
+from enum import Enum, IntEnum
from . import pq
return _py2pg[fmt]
-class IsolationLevel(Enum):
+class IsolationLevel(IntEnum):
"""
Enum representing the isolation level for a transaction.
"""
if self.pgconn.transaction_status != TransactionStatus.IDLE:
return
- yield from self._exec_command(b"begin")
+ yield from self._exec_command(self._get_tx_start_command())
+
+ def _get_tx_start_command(self) -> bytes:
+ parts = [b"begin"]
+
+ if self.isolation_level is not None:
+ val = IsolationLevel(self.isolation_level)
+ parts.append(b"isolation level")
+ parts.append(val.name.lower().replace("_", " ").encode("utf8"))
+
+ if self.read_only is not None:
+ parts.append(b"read only" if self.read_only else b"read write")
+
+ if self.deferrable is not None:
+ parts.append(
+ b"deferrable" if self.deferrable else b"not deferrable"
+ )
+
+ return b" ".join(parts)
def _commit_gen(self) -> PQGen[None]:
"""Generator implementing `Connection.commit()`."""
commands = []
if self._outer_transaction:
assert not self._conn._savepoints, self._conn._savepoints
- commands.append(b"begin")
+ commands.append(self._conn._get_tx_start_command())
if self._savepoint_name:
commands.append(
conn.server_cursor_factory = MyServerCursor
with conn.cursor(name="n") as cur:
assert isinstance(cur, MyServerCursor)
+
+
+tx_params = {
+ "isolation_level": {
+ "guc": "isolation",
+ "values": list(psycopg.IsolationLevel),
+ "set_method": "set_isolation_level",
+ },
+ "read_only": {
+ "guc": "read_only",
+ "values": [True, False],
+ "set_method": "set_read_only",
+ },
+ "deferrable": {
+ "guc": "deferrable",
+ "values": [True, False],
+ "set_method": "set_deferrable",
+ },
+}
+
+# Map Python values to Postgres values for the tx_params possible values
+tx_values_map = {
+ v.name.lower().replace("_", " "): v.value for v in psycopg.IsolationLevel
+}
+tx_values_map["on"] = True
+tx_values_map["off"] = False
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_transaction_param_default(conn, attr):
+ assert getattr(conn, attr) is None
+ guc = tx_params[attr]["guc"]
+ current, default = conn.execute(
+ "select current_setting(%s), current_setting(%s)",
+ [f"transaction_{guc}", f"default_transaction_{guc}"],
+ ).fetchone()
+ assert current == default
+
+
+@pytest.mark.parametrize("autocommit", [True, False])
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_implicit(conn, attr, autocommit):
+ guc = tx_params[attr]["guc"]
+ conn.autocommit = autocommit
+ for value in tx_params[attr]["values"]:
+ setattr(conn, attr, value)
+ pgval, default = conn.execute(
+ "select current_setting(%s), current_setting(%s)",
+ [f"transaction_{guc}", f"default_transaction_{guc}"],
+ ).fetchone()
+ if autocommit:
+ assert pgval == default
+ else:
+ assert tx_values_map[pgval] == value
+ conn.rollback()
+
+
+@pytest.mark.parametrize("autocommit", [True, False])
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_block(conn, attr, autocommit):
+ guc = tx_params[attr]["guc"]
+ conn.autocommit = autocommit
+ for value in tx_params[attr]["values"]:
+ setattr(conn, attr, value)
+ with conn.transaction():
+ pgval = conn.execute(
+ "select current_setting(%s)", [f"transaction_{guc}"]
+ ).fetchone()[0]
+ assert tx_values_map[pgval] == value
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_not_intrans_implicit(conn, attr):
+ conn.execute("select 1")
+ with pytest.raises(psycopg.ProgrammingError):
+ setattr(conn, attr, tx_params[attr]["values"][0])
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_not_intrans_block(conn, attr):
+ with conn.transaction():
+ with pytest.raises(psycopg.ProgrammingError):
+ setattr(conn, attr, tx_params[attr]["values"][0])
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_not_intrans_external(conn, attr):
+ conn.autocommit = True
+ conn.execute("begin")
+ with pytest.raises(psycopg.ProgrammingError):
+ setattr(conn, attr, tx_params[attr]["values"][0])
+
+
+def test_set_transaction_param_all(conn):
+ for attr in tx_params:
+ value = tx_params[attr]["values"][0]
+ setattr(conn, attr, value)
+
+ for attr in tx_params:
+ guc = tx_params[attr]["guc"]
+ pgval = conn.execute(
+ "select current_setting(%s)", [f"transaction_{guc}"]
+ ).fetchone()[0]
+ assert tx_values_map[pgval] == value
from .utils import gc_collect
from .test_cursor import my_row_factory
+from .test_connection import tx_params, tx_values_map
pytestmark = pytest.mark.asyncio
aconn.server_cursor_factory = MyServerCursor
async with aconn.cursor(name="n") as cur:
assert isinstance(cur, MyServerCursor)
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_transaction_param_default(aconn, attr):
+ assert getattr(aconn, attr) is None
+ guc = tx_params[attr]["guc"]
+ cur = await aconn.execute(
+ "select current_setting(%s), current_setting(%s)",
+ [f"transaction_{guc}", f"default_transaction_{guc}"],
+ )
+ current, default = await cur.fetchone()
+ assert current == default
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_transaction_param_readonly_property(aconn, attr):
+ with pytest.raises(AttributeError):
+ setattr(aconn, attr, None)
+
+
+@pytest.mark.parametrize("autocommit", [True, False])
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_implicit(aconn, attr, autocommit):
+ guc = tx_params[attr]["guc"]
+ await aconn.set_autocommit(autocommit)
+ for value in tx_params[attr]["values"]:
+ await getattr(aconn, tx_params[attr]["set_method"])(value)
+ cur = await aconn.execute(
+ "select current_setting(%s), current_setting(%s)",
+ [f"transaction_{guc}", f"default_transaction_{guc}"],
+ )
+ pgval, default = await cur.fetchone()
+ if autocommit:
+ assert pgval == default
+ else:
+ assert tx_values_map[pgval] == value
+ await aconn.rollback()
+
+
+@pytest.mark.parametrize("autocommit", [True, False])
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_block(aconn, attr, autocommit):
+ guc = tx_params[attr]["guc"]
+ await aconn.set_autocommit(autocommit)
+ for value in tx_params[attr]["values"]:
+ await getattr(aconn, tx_params[attr]["set_method"])(value)
+ async with aconn.transaction():
+ cur = await aconn.execute(
+ "select current_setting(%s)", [f"transaction_{guc}"]
+ )
+ pgval = (await cur.fetchone())[0]
+ assert tx_values_map[pgval] == value
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_not_intrans_implicit(aconn, attr):
+ await aconn.execute("select 1")
+ value = tx_params[attr]["values"][0]
+ with pytest.raises(psycopg.ProgrammingError):
+ await getattr(aconn, tx_params[attr]["set_method"])(value)
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_not_intrans_block(aconn, attr):
+ value = tx_params[attr]["values"][0]
+ async with aconn.transaction():
+ with pytest.raises(psycopg.ProgrammingError):
+ await getattr(aconn, tx_params[attr]["set_method"])(value)
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_not_intrans_external(aconn, attr):
+ value = tx_params[attr]["values"][0]
+ await aconn.set_autocommit(True)
+ await aconn.execute("begin")
+ with pytest.raises(psycopg.ProgrammingError):
+ await getattr(aconn, tx_params[attr]["set_method"])(value)
+
+
+async def test_set_transaction_param_all(aconn):
+ for attr in tx_params:
+ value = tx_params[attr]["values"][0]
+ await getattr(aconn, tx_params[attr]["set_method"])(value)
+
+ for attr in tx_params:
+ guc = tx_params[attr]["guc"]
+ cur = await aconn.execute(
+ "select current_setting(%s)", [f"transaction_{guc}"]
+ )
+ pgval = (await cur.fetchone())[0]
+ assert tx_values_map[pgval] == value