self._set_autocommit(value)
def _set_autocommit(self, value: bool) -> None:
- # Base implementation, not thread safe.
- # Subclasses must call it holding a lock
- self._check_intrans("autocommit")
+ raise NotImplementedError
+
+ def _set_autocommit_gen(self, value: bool) -> PQGen[None]:
+ yield from self._check_intrans_gen("autocommit")
self._autocommit = bool(value)
@property
self._set_isolation_level(value)
def _set_isolation_level(self, value: Optional[IsolationLevel]) -> None:
- # Base implementation, not thread safe.
- # Subclasses must call it holding a lock
- self._check_intrans("isolation_level")
+ raise NotImplementedError
+
+ def _set_isolation_level_gen(self, value: Optional[IsolationLevel]) -> PQGen[None]:
+ yield from self._check_intrans_gen("isolation_level")
self._isolation_level = IsolationLevel(value) if value is not None else None
self._begin_statement = b""
self._set_read_only(value)
def _set_read_only(self, value: Optional[bool]) -> None:
- # Base implementation, not thread safe.
- # Subclasses must call it holding a lock
- self._check_intrans("read_only")
+ raise NotImplementedError
+
+ def _set_read_only_gen(self, value: Optional[bool]) -> PQGen[None]:
+ yield from self._check_intrans_gen("read_only")
self._read_only = bool(value)
self._begin_statement = b""
self._set_deferrable(value)
def _set_deferrable(self, value: Optional[bool]) -> None:
- # Base implementation, not thread safe.
- # Subclasses must call it holding a lock
- self._check_intrans("deferrable")
+ raise NotImplementedError
+
+ def _set_deferrable_gen(self, value: Optional[bool]) -> PQGen[None]:
+ yield from self._check_intrans_gen("deferrable")
self._deferrable = bool(value)
self._begin_statement = b""
- def _check_intrans(self, attribute: str) -> None:
+ def _check_intrans_gen(self, attribute: str) -> PQGen[None]:
# Raise an exception if we are in a transaction
status = self.pgconn.transaction_status
+ if status == TransactionStatus.IDLE and self._pipeline:
+ yield from self._pipeline._sync_gen()
+ status = self.pgconn.transaction_status
if status != TransactionStatus.IDLE:
if self._num_transactions:
raise e.ProgrammingError(
def _set_autocommit(self, value: bool) -> None:
with self.lock:
- super()._set_autocommit(value)
+ self.wait(self._set_autocommit_gen(value))
def _set_isolation_level(self, value: Optional[IsolationLevel]) -> None:
with self.lock:
- super()._set_isolation_level(value)
+ self.wait(self._set_isolation_level_gen(value))
def _set_read_only(self, value: Optional[bool]) -> None:
with self.lock:
- super()._set_read_only(value)
+ self.wait(self._set_read_only_gen(value))
def _set_deferrable(self, value: Optional[bool]) -> None:
with self.lock:
- super()._set_deferrable(value)
+ self.wait(self._set_deferrable_gen(value))
def tpc_begin(self, xid: Union[Xid, str]) -> None:
"""
async def set_autocommit(self, value: bool) -> None:
"""Async version of the `~Connection.autocommit` setter."""
async with self.lock:
- super()._set_autocommit(value)
+ await self.wait(self._set_autocommit_gen(value))
def _set_isolation_level(self, value: Optional[IsolationLevel]) -> None:
self._no_set_async("isolation_level")
async def set_isolation_level(self, value: Optional[IsolationLevel]) -> None:
"""Async version of the `~Connection.isolation_level` setter."""
async with self.lock:
- super()._set_isolation_level(value)
+ await self.wait(self._set_isolation_level_gen(value))
def _set_read_only(self, value: Optional[bool]) -> None:
self._no_set_async("read_only")
async def set_read_only(self, value: Optional[bool]) -> None:
"""Async version of the `~Connection.read_only` setter."""
async with self.lock:
- super()._set_read_only(value)
+ await self.wait(self._set_read_only_gen(value))
def _set_deferrable(self, value: Optional[bool]) -> None:
self._no_set_async("deferrable")
async def set_deferrable(self, value: Optional[bool]) -> None:
"""Async version of the `~Connection.deferrable` setter."""
async with self.lock:
- super()._set_deferrable(value)
+ await self.wait(self._set_deferrable_gen(value))
def _no_set_async(self, attribute: str) -> None:
raise AttributeError(
assert await inserted(aconn) == {"foo", "baz"}
-async def test_prohibits_use_of_commit_rollback_autocommit(aconn, apipeline):
+async def test_prohibits_use_of_commit_rollback_autocommit(aconn):
"""
Within a Transaction block, it is forbidden to touch commit, rollback,
or the autocommit setting on the connection, as this would interfere
with the transaction scope being managed by the Transaction block.
"""
- if apipeline:
- # TODO: Fixing Connection._check_intrans() would require calling
- # conn._pipeline.sync(), which implies turning _check_intrans() into a
- # generator method.
- pytest.xfail("Connection._check_intrans() does not account for pipeline mode")
await aconn.set_autocommit(False)
await aconn.commit()
await aconn.rollback()