ones: you should call `!await` `~AsyncConnection.set_autocommit`
:samp:`({value})` instead.
+ .. automethod:: set_autocommit
+
+ .. versionadded:: 3.2
+
The following three properties control the characteristics of new
transactions. See :ref:`transaction-characteristics` for details.
.. __: https://www.postgresql.org/docs/current/runtime-config-client.html
#GUC-DEFAULT-TRANSACTION-ISOLATION
+ .. automethod:: set_isolation_level
+
+ .. versionadded:: 3.2
+
.. autoattribute:: read_only
`!None` means use the default set in the default_transaction_read_only__
.. __: https://www.postgresql.org/docs/current/runtime-config-client.html
#GUC-DEFAULT-TRANSACTION-READ-ONLY
+ .. automethod:: set_read_only
+
+ .. versionadded:: 3.2
+
.. autoattribute:: deferrable
`!None` means use the default set in the default_transaction_deferrable__
.. __: https://www.postgresql.org/docs/current/runtime-config-client.html
#GUC-DEFAULT-TRANSACTION-DEFERRABLE
+ .. automethod:: set_deferrable
+
+ .. versionadded:: 3.2
+
.. rubric:: Checking and configuring the connection state
can only be changed if there isn't a transaction already active on the
connection.
+.. versionadded:: 3.2
+ Added methods equivalent to setting the properties (such as
+ `~Connection.set_isolation_level()`) on sync connections too.
+
.. warning::
Applications running at `~IsolationLevel.REPEATABLE_READ` or
(:ticket:`#332`).
- Add :ref:`raw-query-cursors` to execute queries using placeholders in
PostgreSQL format (`$1`, `$2`...) (:ticket:`#560`).
+- Add `~Connection.set_autocommit()` on sync connections, and similar
+ transaction control methods available on the async connections.
- Add support for libpq functions to close prepared statements and portals
introduced in libpq v17 (:ticket:`#603`).
- Disable receiving more than one result on the same cursor in pipeline mode,
return waiting.wait_conn(gen, timeout=timeout)
def _set_autocommit(self, value: bool) -> None:
+ self.set_autocommit(value)
+
+ def set_autocommit(self, value: bool) -> None:
+ """Method version of the `~Connection.autocommit` setter."""
with self.lock:
self.wait(self._set_autocommit_gen(value))
def _set_isolation_level(self, value: Optional[IsolationLevel]) -> None:
+ self.set_isolation_level(value)
+
+ def set_isolation_level(self, value: Optional[IsolationLevel]) -> None:
+ """Method version of the `~Connection.isolation_level` setter."""
with self.lock:
self.wait(self._set_isolation_level_gen(value))
def _set_read_only(self, value: Optional[bool]) -> None:
+ self.set_read_only(value)
+
+ def set_read_only(self, value: Optional[bool]) -> None:
+ """Method version of the `~Connection.read_only` setter."""
with self.lock:
self.wait(self._set_read_only_gen(value))
def _set_deferrable(self, value: Optional[bool]) -> None:
+ self.set_deferrable(value)
+
+ def set_deferrable(self, value: Optional[bool]) -> None:
+ """Method version of the `~Connection.deferrable` setter."""
with self.lock:
self.wait(self._set_deferrable_gen(value))
import psycopg
-async def aconn_set(conn, param, value):
- """Equivalent of 'await conn.set_param(value)'
-
- Converted to conn_set in sync tests.
- """
- await getattr(conn, f"set_{param}")(value)
-
-
-def conn_set(conn, param, value):
- """Equivalent of 'conn.param = value'.
-
- Converted from aconn_set in sync tests.
- """
- setattr(conn, param, value)
-
-
@pytest.fixture
def testctx(svcconn):
svcconn.execute("create table if not exists testctx (id int primary key)")
from .utils import gc_collect, is_async
from ._test_cursor import my_row_factory
from ._test_connection import tx_params, tx_params_isolation, tx_values_map
-from ._test_connection import conninfo_params_timeout, conn_set
+from ._test_connection import conninfo_params_timeout
from ._test_connection import testctx # noqa: F401 # fixture
from .test_adapt import make_bin_dumper, make_dumper
def test_autocommit(conn):
assert conn.autocommit is False
+ conn.set_autocommit(True)
+ assert conn.autocommit
+ cur = conn.cursor()
+ cur.execute("select 1")
+ assert cur.fetchone() == (1,)
+ assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+
+ conn.set_autocommit("")
+ assert isinstance(conn.autocommit, bool)
+ assert conn.autocommit is False
+
+ conn.set_autocommit("yeah")
+ assert isinstance(conn.autocommit, bool)
+ assert conn.autocommit is True
+
+
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+def test_autocommit_property(conn):
+ assert conn.autocommit is False
conn.autocommit = True
assert conn.autocommit
assert cur.fetchone() == (1,)
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
with pytest.raises(psycopg.ProgrammingError):
- conn.autocommit = True
+ conn.set_autocommit(True)
assert not conn.autocommit
cur.execute("meh")
assert conn.pgconn.transaction_status == conn.TransactionStatus.INERROR
with pytest.raises(psycopg.ProgrammingError):
- conn.autocommit = True
+ conn.set_autocommit(True)
assert not conn.autocommit
conn.close()
assert conn.pgconn.transaction_status == conn.TransactionStatus.UNKNOWN
with pytest.raises(psycopg.OperationalError):
- conn.autocommit = True
+ conn.set_autocommit(True)
assert not conn.autocommit
conn.add_notify_handler(cb1)
conn.add_notify_handler(lambda n: nots2.append(n))
- conn.autocommit = True
+ conn.set_autocommit(True)
cur = conn.cursor()
cur.execute("listen foo")
cur.execute("notify foo, 'n1'")
@pytest.mark.parametrize("autocommit", [True, False])
@pytest.mark.parametrize("param", tx_params_isolation)
def test_set_transaction_param_implicit(conn, param, autocommit):
- conn.autocommit = autocommit
+ conn.set_autocommit(autocommit)
for value in param.values:
- conn_set(conn, param.name, value)
+ getattr(conn, f"set_{param.name}")(value)
cur = conn.execute(
"select current_setting(%s), current_setting(%s)",
[f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
conn.commit()
for value in param.values:
- conn_set(conn, param.name, value)
+ getattr(conn, f"set_{param.name}")(value)
cur = conn.execute("select current_setting(%s)", [f"transaction_{param.guc}"])
(pgval,) = cur.fetchone()
assert tx_values_map[pgval] == value
conn.rollback()
- conn_set(conn, param.name, None)
+ getattr(conn, f"set_{param.name}")(None)
cur = conn.execute("select current_setting(%s)", [f"transaction_{param.guc}"])
(pgval,) = cur.fetchone()
assert tx_values_map[pgval] == tx_values_map[param.non_default]
@pytest.mark.parametrize("autocommit", [True, False])
@pytest.mark.parametrize("param", tx_params_isolation)
def test_set_transaction_param_block(conn, param, autocommit):
- conn.autocommit = autocommit
+ conn.set_autocommit(autocommit)
for value in param.values:
- conn_set(conn, param.name, value)
+ getattr(conn, f"set_{param.name}")(value)
with conn.transaction():
cur = conn.execute(
"select current_setting(%s)", [f"transaction_{param.guc}"]
conn.execute("select 1")
value = param.values[0]
with pytest.raises(psycopg.ProgrammingError):
- conn_set(conn, param.name, value)
+ getattr(conn, f"set_{param.name}")(value)
@pytest.mark.parametrize("param", tx_params)
value = param.values[0]
with conn.transaction():
with pytest.raises(psycopg.ProgrammingError):
- conn_set(conn, param.name, value)
+ getattr(conn, f"set_{param.name}")(value)
@pytest.mark.parametrize("param", tx_params)
def test_set_transaction_param_not_intrans_external(conn, param):
value = param.values[0]
-
- conn.autocommit = True
+ conn.set_autocommit(True)
conn.execute("begin")
with pytest.raises(psycopg.ProgrammingError):
- conn_set(conn, param.name, value)
+ getattr(conn, f"set_{param.name}")(value)
+
+
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+@pytest.mark.crdb("skip", reason="transaction isolation")
+def test_set_transaction_param_all_property(conn):
+ params: List[Any] = tx_params[:]
+ params[2] = params[2].values[0]
+
+ for param in params:
+ value = param.values[0]
+ setattr(conn, param.name, value)
+
+ for param in params:
+ cur = conn.execute("select current_setting(%s)", [f"transaction_{param.guc}"])
+ pgval = cur.fetchone()[0]
+ assert tx_values_map[pgval] == value
@pytest.mark.crdb("skip", reason="transaction isolation")
for param in params:
value = param.values[0]
- conn_set(conn, param.name, value)
+ getattr(conn, f"set_{param.name}")(value)
for param in params:
cur = conn.execute("select current_setting(%s)", [f"transaction_{param.guc}"])
def test_set_transaction_param_strange(conn):
+ for val in ("asdf", 0, 5):
+ with pytest.raises(ValueError):
+ conn.set_isolation_level(val)
+
+ conn.set_isolation_level(psycopg.IsolationLevel.SERIALIZABLE.value)
+ assert conn.isolation_level is psycopg.IsolationLevel.SERIALIZABLE
+
+ conn.set_read_only(1)
+ assert conn.read_only is True
+
+ conn.set_deferrable(0)
+ assert conn.deferrable is False
+
+
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+def test_set_transaction_param_strange_property(conn):
for val in ("asdf", 0, 5):
with pytest.raises(ValueError):
conn.isolation_level = val
from .utils import gc_collect, is_async
from ._test_cursor import my_row_factory
from ._test_connection import tx_params, tx_params_isolation, tx_values_map
-from ._test_connection import conninfo_params_timeout, aconn_set
+from ._test_connection import conninfo_params_timeout
from ._test_connection import testctx # noqa: F401 # fixture
from .test_adapt import make_bin_dumper, make_dumper
assert aconn.autocommit is True
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+def test_autocommit_property(conn):
+ assert conn.autocommit is False
+
+ conn.autocommit = True
+ assert conn.autocommit
+ cur = conn.cursor()
+ cur.execute("select 1")
+ assert cur.fetchone() == (1,)
+ assert conn.pgconn.transaction_status == conn.TransactionStatus.IDLE
+
+ conn.autocommit = ""
+ assert isinstance(conn.autocommit, bool)
+ assert conn.autocommit is False
+
+ conn.autocommit = "yeah"
+ assert isinstance(conn.autocommit, bool)
+ assert conn.autocommit is True
+
+
async def test_autocommit_connect(aconn_cls, dsn):
aconn = await aconn_cls.connect(dsn, autocommit=True)
assert aconn.autocommit
async def test_set_transaction_param_implicit(aconn, param, autocommit):
await aconn.set_autocommit(autocommit)
for value in param.values:
- await aconn_set(aconn, param.name, value)
+ await getattr(aconn, f"set_{param.name}")(value)
cur = await aconn.execute(
"select current_setting(%s), current_setting(%s)",
[f"transaction_{param.guc}", f"default_transaction_{param.guc}"],
await aconn.commit()
for value in param.values:
- await aconn_set(aconn, param.name, value)
+ await getattr(aconn, f"set_{param.name}")(value)
cur = await aconn.execute(
"select current_setting(%s)", [f"transaction_{param.guc}"]
)
assert tx_values_map[pgval] == value
await aconn.rollback()
- await aconn_set(aconn, param.name, None)
+ await getattr(aconn, f"set_{param.name}")(None)
cur = await aconn.execute(
"select current_setting(%s)", [f"transaction_{param.guc}"]
)
async def test_set_transaction_param_block(aconn, param, autocommit):
await aconn.set_autocommit(autocommit)
for value in param.values:
- await aconn_set(aconn, param.name, value)
+ await getattr(aconn, f"set_{param.name}")(value)
async with aconn.transaction():
cur = await aconn.execute(
"select current_setting(%s)", [f"transaction_{param.guc}"]
await aconn.execute("select 1")
value = param.values[0]
with pytest.raises(psycopg.ProgrammingError):
- await aconn_set(aconn, param.name, value)
+ await getattr(aconn, f"set_{param.name}")(value)
@pytest.mark.parametrize("param", tx_params)
value = param.values[0]
async with aconn.transaction():
with pytest.raises(psycopg.ProgrammingError):
- await aconn_set(aconn, param.name, value)
+ await getattr(aconn, f"set_{param.name}")(value)
@pytest.mark.parametrize("param", tx_params)
await aconn.set_autocommit(True)
await aconn.execute("begin")
with pytest.raises(psycopg.ProgrammingError):
- await aconn_set(aconn, param.name, value)
+ await getattr(aconn, f"set_{param.name}")(value)
+
+
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+@pytest.mark.crdb("skip", reason="transaction isolation")
+def test_set_transaction_param_all_property(conn):
+ params: List[Any] = tx_params[:]
+ params[2] = params[2].values[0]
+
+ for param in params:
+ value = param.values[0]
+ setattr(conn, param.name, value)
+
+ for param in params:
+ cur = conn.execute("select current_setting(%s)", [f"transaction_{param.guc}"])
+ pgval = cur.fetchone()[0]
+ assert tx_values_map[pgval] == value
@pytest.mark.crdb("skip", reason="transaction isolation")
for param in params:
value = param.values[0]
- await aconn_set(aconn, param.name, value)
+ await getattr(aconn, f"set_{param.name}")(value)
for param in params:
cur = await aconn.execute(
assert aconn.deferrable is False
+@pytest.mark.skipif(is_async(__name__), reason="sync test only")
+def test_set_transaction_param_strange_property(conn):
+ for val in ("asdf", 0, 5):
+ with pytest.raises(ValueError):
+ conn.isolation_level = val
+
+ conn.isolation_level = psycopg.IsolationLevel.SERIALIZABLE.value
+ assert conn.isolation_level is psycopg.IsolationLevel.SERIALIZABLE
+
+ conn.read_only = 1
+ assert conn.read_only is True
+
+ conn.deferrable = 0
+ assert conn.deferrable is False
+
+
@pytest.mark.parametrize("dsn, kwargs, exp", conninfo_params_timeout)
async def test_get_connection_params(aconn_cls, dsn, kwargs, exp, setpgenv):
setpgenv({})
def test_copy_bad_result(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
cur = conn.cursor()
def test_stream_error_notx(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
cur = conn.cursor()
with pytest.raises(psycopg.ProgrammingError):
for rec in cur.stream("wat"):
@pytest.mark.parametrize("autocommit", [False, True])
def test_stream_close(conn, autocommit):
- conn.autocommit = autocommit
+ conn.set_autocommit(autocommit)
cur = conn.cursor()
with pytest.raises(psycopg.OperationalError):
for rec in cur.stream("select generate_series(1, 3)"):
notices = []
conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
- conn.autocommit = True
+ conn.set_autocommit(True)
with conn.pipeline():
cur = conn.execute("select 'test'")
assert cur.fetchone() == ("test",)
def test_pipeline_errors_processed_at_exit(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
with pytest.raises(e.UndefinedTable):
with conn.pipeline():
conn.execute("select * from nosuchtable")
def test_autocommit(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
with conn.pipeline(), conn.cursor() as c:
c.execute("select 1")
def test_pipeline_aborted(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
with conn.pipeline() as p:
c1 = conn.execute("select 1")
with pytest.raises(e.UndefinedTable):
@pytest.mark.flakey("assert rarely fails randomly in CI blocking release")
def test_sync_syncs_errors(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
with conn.pipeline() as p:
conn.execute("select 1 from nosuchtable")
with pytest.raises(e.UndefinedTable):
def test_implicit_transaction(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
with conn.pipeline():
assert conn.pgconn.transaction_status == pq.TransactionStatus.IDLE
conn.execute("select 'before'")
def test_executemany(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
conn.execute("drop table if exists execmanypipeline")
conn.execute(
"create unlogged table execmanypipeline (id serial primary key, num integer)"
def test_executemany_no_returning(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
conn.execute("drop table if exists execmanypipelinenoreturning")
conn.execute(
"""create unlogged table execmanypipelinenoreturning
@pytest.mark.crdb("skip", reason="temp tables")
def test_executemany_trace(conn, trace):
- conn.autocommit = True
+ conn.set_autocommit(True)
cur = conn.cursor()
cur.execute("create temp table trace (id int)")
t = trace.trace(conn)
@pytest.mark.crdb("skip", reason="temp tables")
def test_executemany_trace_returning(conn, trace):
- conn.autocommit = True
+ conn.set_autocommit(True)
cur = conn.cursor()
cur.execute("create temp table trace (id int)")
t = trace.trace(conn)
def test_prepared(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
with conn.pipeline():
c1 = conn.execute("select %s::int", [10], prepare=True)
c2 = conn.execute(
An invalid prepared statement, in a pipeline, should be discarded at exit
and not reused.
"""
-
- conn.autocommit = True
+ conn.set_autocommit(True)
stmt = "INSERT INTO nosuchtable(data) VALUES (%s)"
with pytest.raises(psycopg.errors.UndefinedTable):
with conn.pipeline():
def test_rollback_explicit(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
with conn.pipeline():
with pytest.raises(e.DivisionByZero):
cur = conn.execute("select 1 / %s", [0])
def test_rollback_transaction(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
with pytest.raises(e.DivisionByZero):
with conn.pipeline():
with conn.transaction():
notices = []
conn.add_notice_handler(lambda diag: notices.append(diag.message_primary))
- conn.autocommit = True
+ conn.set_autocommit(True)
with conn.pipeline():
cur = conn.execute("select 'test'")
assert cur.fetchone() == ("test",)
conn.execute("update accessed set value = now()")
return cur
- conn.autocommit = True
+ conn.set_autocommit(True)
(before,) = conn.execute("select value from accessed").fetchone()
def test_no_prepare_error(conn):
- conn.autocommit = True
+ conn.set_autocommit(True)
for i in range(10):
with pytest.raises(conn.ProgrammingError):
conn.execute("select wat")
def test_recovered_xids(self, conn, tpc):
# insert a few test xns
-
- conn.autocommit = True
+ conn.set_autocommit(True)
cur = conn.cursor()
cur.execute("begin; prepare transaction '1-foo'")
cur.execute("begin; prepare transaction '2-bar'")
or the autocommit setting on the connection, as this would interfere
with the transaction scope being managed by the Transaction block.
"""
-
- conn.autocommit = False
+ conn.set_autocommit(False)
conn.commit()
conn.rollback()
with conn.transaction():
with pytest.raises(e.ProgrammingError):
- conn.autocommit = False
+ conn.set_autocommit(False)
with pytest.raises(e.ProgrammingError):
conn.commit()
with pytest.raises(e.ProgrammingError):
conn.rollback()
- conn.autocommit = False
+ conn.set_autocommit(False)
conn.commit()
conn.rollback()
"""
Connection.autocommit is unchanged both during and after Transaction block.
"""
-
- conn.autocommit = autocommit
+ conn.set_autocommit(autocommit)
with conn.transaction():
assert conn.autocommit is autocommit
assert conn.autocommit is autocommit
Outcome:
* Changes made within Transaction context are committed
"""
-
- conn.autocommit = False
+ conn.set_autocommit(False)
assert not in_transaction(conn)
with conn.transaction():
insert_row(conn, "new")
Outcome:
* Changes made within Transaction context are discarded
"""
-
- conn.autocommit = False
+ conn.set_autocommit(False)
assert not in_transaction(conn)
with pytest.raises(ExpectedException):
with conn.transaction():
* Outer transaction is left running, and no changes are visible to an
outside observer from another connection.
"""
-
- conn.autocommit = False
+ conn.set_autocommit(False)
insert_row(conn, "prior")
if pipeline:
pipeline.sync()
* Outer transaction is left running, and no changes are visible to an
outside observer from another connection.
"""
-
- conn.autocommit = False
+ conn.set_autocommit(False)
insert_row(conn, "prior")
if pipeline:
pipeline.sync()
@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback])
def test_out_of_order_exit(conn, exit_error):
- conn.autocommit = True
+ conn.set_autocommit(True)
t1 = conn.transaction()
t1.__enter__()
@pytest.mark.parametrize("exit_error", [None, ZeroDivisionError, Rollback])
def test_out_of_order_exit_same_name(conn, exit_error):
- conn.autocommit = True
+ conn.set_autocommit(True)
t1 = conn.transaction("save")
t1.__enter__()
tree = BlanksInserter().visit(tree)
tree = RenameAsyncToSync().visit(tree)
tree = AsyncToSync().visit(tree)
- tree = FixAsyncSetters().visit(tree)
return tree
"acommands": "commands",
"aconn": "conn",
"aconn_cls": "conn_cls",
- "aconn_set": "conn_set",
"alist": "list",
"anext": "next",
"apipeline": "pipeline",
return node
-class FixAsyncSetters(ast.NodeTransformer):
- setters_map = {
- "set_autocommit": "autocommit",
- "set_read_only": "read_only",
- "set_isolation_level": "isolation_level",
- "set_deferrable": "deferrable",
- }
-
- def visit_Call(self, node: ast.Call) -> ast.AST:
- new_node = self._fix_setter(node)
- if new_node:
- return new_node
-
- self.generic_visit(node)
- return node
-
- def _fix_setter(self, node: ast.Call) -> ast.AST | None:
- if not isinstance(node.func, ast.Attribute):
- return None
- if node.func.attr not in self.setters_map:
- return None
- obj = node.func.value
- arg = node.args[0]
- new_node = ast.Assign(
- targets=[ast.Attribute(value=obj, attr=self.setters_map[node.func.attr])],
- value=arg,
- )
- ast.copy_location(new_node, node)
- return new_node
-
-
class BlanksInserter(ast.NodeTransformer):
"""
Restore the missing spaces in the source (or something similar)