]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Implement the transaction control attributes
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 23 Jul 2021 14:28:58 +0000 (16:28 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 23 Jul 2021 14:39:44 +0000 (16:39 +0200)
psycopg/psycopg/_enums.py
psycopg/psycopg/connection.py
psycopg/psycopg/transaction.py
tests/test_connection.py
tests/test_connection_async.py

index 070fd1da50ecbfa701a2f82e72b02f284f2b9e41..5f553864f1812060535047551e72364cf4301e73 100644 (file)
@@ -7,7 +7,7 @@ libpq-defined enums.
 
 # Copyright (C) 2020-2021 The Psycopg Team
 
-from enum import Enum
+from enum import Enum, IntEnum
 
 from . import pq
 
@@ -38,7 +38,7 @@ class PyFormat(str, Enum):
         return _py2pg[fmt]
 
 
-class IsolationLevel(Enum):
+class IsolationLevel(IntEnum):
     """
     Enum representing the isolation level for a transaction.
     """
index 5c6c0ff67ec98da9cabe3e72f981ed54b2ddb3ba..0cdd558462da137a9ae544ed01440136236ae763 100644 (file)
@@ -469,7 +469,25 @@ class BaseConnection(Generic[Row]):
         if self.pgconn.transaction_status != TransactionStatus.IDLE:
             return
 
-        yield from self._exec_command(b"begin")
+        yield from self._exec_command(self._get_tx_start_command())
+
+    def _get_tx_start_command(self) -> bytes:
+        parts = [b"begin"]
+
+        if self.isolation_level is not None:
+            val = IsolationLevel(self.isolation_level)
+            parts.append(b"isolation level")
+            parts.append(val.name.lower().replace("_", " ").encode("utf8"))
+
+        if self.read_only is not None:
+            parts.append(b"read only" if self.read_only else b"read write")
+
+        if self.deferrable is not None:
+            parts.append(
+                b"deferrable" if self.deferrable else b"not deferrable"
+            )
+
+        return b" ".join(parts)
 
     def _commit_gen(self) -> PQGen[None]:
         """Generator implementing `Connection.commit()`."""
index 2804e1e6fcfd931dcb2b830e1de2d04106949e93..85a5c3a17ffa95ee5b8982fbc6f5e0016672f484 100644 (file)
@@ -99,7 +99,7 @@ class BaseTransaction(Generic[ConnectionType]):
         commands = []
         if self._outer_transaction:
             assert not self._conn._savepoints, self._conn._savepoints
-            commands.append(b"begin")
+            commands.append(self._conn._get_tx_start_command())
 
         if self._savepoint_name:
             commands.append(
index 3bf87b3dc41fe93f413019ab5acd184fee8036b8..c7c78cd4558aaaf045a01e16cd8446dae4d7f40f 100644 (file)
@@ -566,3 +566,107 @@ def test_server_cursor_factory(conn):
     conn.server_cursor_factory = MyServerCursor
     with conn.cursor(name="n") as cur:
         assert isinstance(cur, MyServerCursor)
+
+
+tx_params = {
+    "isolation_level": {
+        "guc": "isolation",
+        "values": list(psycopg.IsolationLevel),
+        "set_method": "set_isolation_level",
+    },
+    "read_only": {
+        "guc": "read_only",
+        "values": [True, False],
+        "set_method": "set_read_only",
+    },
+    "deferrable": {
+        "guc": "deferrable",
+        "values": [True, False],
+        "set_method": "set_deferrable",
+    },
+}
+
+# Map Python values to Postgres values for the tx_params possible values
+tx_values_map = {
+    v.name.lower().replace("_", " "): v.value for v in psycopg.IsolationLevel
+}
+tx_values_map["on"] = True
+tx_values_map["off"] = False
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_transaction_param_default(conn, attr):
+    assert getattr(conn, attr) is None
+    guc = tx_params[attr]["guc"]
+    current, default = conn.execute(
+        "select current_setting(%s), current_setting(%s)",
+        [f"transaction_{guc}", f"default_transaction_{guc}"],
+    ).fetchone()
+    assert current == default
+
+
+@pytest.mark.parametrize("autocommit", [True, False])
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_implicit(conn, attr, autocommit):
+    guc = tx_params[attr]["guc"]
+    conn.autocommit = autocommit
+    for value in tx_params[attr]["values"]:
+        setattr(conn, attr, value)
+        pgval, default = conn.execute(
+            "select current_setting(%s), current_setting(%s)",
+            [f"transaction_{guc}", f"default_transaction_{guc}"],
+        ).fetchone()
+        if autocommit:
+            assert pgval == default
+        else:
+            assert tx_values_map[pgval] == value
+        conn.rollback()
+
+
+@pytest.mark.parametrize("autocommit", [True, False])
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_block(conn, attr, autocommit):
+    guc = tx_params[attr]["guc"]
+    conn.autocommit = autocommit
+    for value in tx_params[attr]["values"]:
+        setattr(conn, attr, value)
+        with conn.transaction():
+            pgval = conn.execute(
+                "select current_setting(%s)", [f"transaction_{guc}"]
+            ).fetchone()[0]
+        assert tx_values_map[pgval] == value
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_not_intrans_implicit(conn, attr):
+    conn.execute("select 1")
+    with pytest.raises(psycopg.ProgrammingError):
+        setattr(conn, attr, tx_params[attr]["values"][0])
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_not_intrans_block(conn, attr):
+    with conn.transaction():
+        with pytest.raises(psycopg.ProgrammingError):
+            setattr(conn, attr, tx_params[attr]["values"][0])
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+def test_set_transaction_param_not_intrans_external(conn, attr):
+    conn.autocommit = True
+    conn.execute("begin")
+    with pytest.raises(psycopg.ProgrammingError):
+        setattr(conn, attr, tx_params[attr]["values"][0])
+
+
+def test_set_transaction_param_all(conn):
+    for attr in tx_params:
+        value = tx_params[attr]["values"][0]
+        setattr(conn, attr, value)
+
+    for attr in tx_params:
+        guc = tx_params[attr]["guc"]
+        pgval = conn.execute(
+            "select current_setting(%s)", [f"transaction_{guc}"]
+        ).fetchone()[0]
+        assert tx_values_map[pgval] == value
index eb95787833df5f4dc7cfd55e9c1cefaefca09c34..0e13dff0d6f8af15798f465744425f2be4421f0f 100644 (file)
@@ -14,6 +14,7 @@ from psycopg.conninfo import conninfo_to_dict
 
 from .utils import gc_collect
 from .test_cursor import my_row_factory
+from .test_connection import tx_params, tx_values_map
 
 pytestmark = pytest.mark.asyncio
 
@@ -584,3 +585,94 @@ async def test_server_cursor_factory(aconn):
     aconn.server_cursor_factory = MyServerCursor
     async with aconn.cursor(name="n") as cur:
         assert isinstance(cur, MyServerCursor)
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_transaction_param_default(aconn, attr):
+    assert getattr(aconn, attr) is None
+    guc = tx_params[attr]["guc"]
+    cur = await aconn.execute(
+        "select current_setting(%s), current_setting(%s)",
+        [f"transaction_{guc}", f"default_transaction_{guc}"],
+    )
+    current, default = await cur.fetchone()
+    assert current == default
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_transaction_param_readonly_property(aconn, attr):
+    with pytest.raises(AttributeError):
+        setattr(aconn, attr, None)
+
+
+@pytest.mark.parametrize("autocommit", [True, False])
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_implicit(aconn, attr, autocommit):
+    guc = tx_params[attr]["guc"]
+    await aconn.set_autocommit(autocommit)
+    for value in tx_params[attr]["values"]:
+        await getattr(aconn, tx_params[attr]["set_method"])(value)
+        cur = await aconn.execute(
+            "select current_setting(%s), current_setting(%s)",
+            [f"transaction_{guc}", f"default_transaction_{guc}"],
+        )
+        pgval, default = await cur.fetchone()
+        if autocommit:
+            assert pgval == default
+        else:
+            assert tx_values_map[pgval] == value
+        await aconn.rollback()
+
+
+@pytest.mark.parametrize("autocommit", [True, False])
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_block(aconn, attr, autocommit):
+    guc = tx_params[attr]["guc"]
+    await aconn.set_autocommit(autocommit)
+    for value in tx_params[attr]["values"]:
+        await getattr(aconn, tx_params[attr]["set_method"])(value)
+        async with aconn.transaction():
+            cur = await aconn.execute(
+                "select current_setting(%s)", [f"transaction_{guc}"]
+            )
+            pgval = (await cur.fetchone())[0]
+        assert tx_values_map[pgval] == value
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_not_intrans_implicit(aconn, attr):
+    await aconn.execute("select 1")
+    value = tx_params[attr]["values"][0]
+    with pytest.raises(psycopg.ProgrammingError):
+        await getattr(aconn, tx_params[attr]["set_method"])(value)
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_not_intrans_block(aconn, attr):
+    value = tx_params[attr]["values"][0]
+    async with aconn.transaction():
+        with pytest.raises(psycopg.ProgrammingError):
+            await getattr(aconn, tx_params[attr]["set_method"])(value)
+
+
+@pytest.mark.parametrize("attr", list(tx_params))
+async def test_set_transaction_param_not_intrans_external(aconn, attr):
+    value = tx_params[attr]["values"][0]
+    await aconn.set_autocommit(True)
+    await aconn.execute("begin")
+    with pytest.raises(psycopg.ProgrammingError):
+        await getattr(aconn, tx_params[attr]["set_method"])(value)
+
+
+async def test_set_transaction_param_all(aconn):
+    for attr in tx_params:
+        value = tx_params[attr]["values"][0]
+        await getattr(aconn, tx_params[attr]["set_method"])(value)
+
+    for attr in tx_params:
+        guc = tx_params[attr]["guc"]
+        cur = await aconn.execute(
+            "select current_setting(%s)", [f"transaction_{guc}"]
+        )
+        pgval = (await cur.fetchone())[0]
+        assert tx_values_map[pgval] == value