) -> Dict[str, Any]:
"""Manipulate connection parameters before connecting.
- .. versionchanged:: 3.1
- Unlike the sync counterpart, perform non-blocking address
- resolution and populate the ``hostaddr`` connection parameter,
- unless the user has provided one themselves. See
- `~psycopg._dns.resolve_hostaddr_async()` for details.
-
+ :param conninfo: Connection string as received by `~Connection.connect()`.
+ :param kwargs: Overriding connection arguments as received by `!connect()`.
+ :return: Connection arguments merged and eventually modified, in a
+ format similar to `~conninfo.conninfo_to_dict()`.
"""
params = conninfo_to_dict(conninfo, **kwargs)
return params
async def close(self) -> None:
+ """Close the database connection."""
if self.closed:
return
self._closed = True
prepare: Optional[bool] = None,
binary: bool = False,
) -> AsyncCursor[Row]:
+ """Execute a query and return a cursor to read its results."""
try:
cur = self.cursor()
if binary:
raise ex.with_traceback(None)
async def commit(self) -> None:
+ """Commit any pending transaction to the database."""
async with self.lock:
await self.wait(self._commit_gen())
async def rollback(self) -> None:
+ """Roll back to the start of any pending transaction."""
async with self.lock:
await self.wait(self._rollback_gen())
"""
Start a context block with a new transaction or nested transaction.
+ :param savepoint_name: Name of the savepoint used to manage a nested
+ transaction. If `!None`, one will be chosen automatically.
+ :param force_rollback: Roll back the transaction at the end of the
+ block even if there were no error (e.g. to try a no-op process).
:rtype: AsyncTransaction
"""
tx = AsyncTransaction(self, savepoint_name, force_rollback)
yield tx
async def notifies(self) -> AsyncGenerator[Notify, None]:
+ """
+ Yield `Notify` objects as soon as they are received from the database.
+ """
while True:
async with self.lock:
try:
self._pipeline = None
async def wait(self, gen: PQGen[RV], timeout: Optional[float] = 0.1) -> RV:
+ """
+ Consume a generator operating on the connection.
+
+ The function must be used on generators that don't change connection
+ fd (i.e. not on connect and reset).
+ """
try:
return await waiting.wait_async(gen, self.pgconn.socket, timeout=timeout)
except (asyncio.CancelledError, KeyboardInterrupt):
@classmethod
async def _wait_conn(cls, gen: PQGenConn[RV], timeout: Optional[int]) -> RV:
+ """Consume a connection generator."""
return await waiting.wait_conn_async(gen, timeout)
def _set_autocommit(self, value: bool) -> None:
self._no_set_async("autocommit")
async def set_autocommit(self, value: bool) -> None:
- """Async version of the `~Connection.autocommit` setter."""
+ """Method version of the `~Connection.autocommit` setter."""
async with self.lock:
await self.wait(self._set_autocommit_gen(value))
self._no_set_async("isolation_level")
async def set_isolation_level(self, value: Optional[IsolationLevel]) -> None:
- """Async version of the `~Connection.isolation_level` setter."""
+ """Method version of the `~Connection.isolation_level` setter."""
async with self.lock:
await self.wait(self._set_isolation_level_gen(value))
self._no_set_async("read_only")
async def set_read_only(self, value: Optional[bool]) -> None:
- """Async version of the `~Connection.read_only` setter."""
+ """Method version of the `~Connection.read_only` setter."""
async with self.lock:
await self.wait(self._set_read_only_gen(value))
self._no_set_async("deferrable")
async def set_deferrable(self, value: Optional[bool]) -> None:
- """Async version of the `~Connection.deferrable` setter."""
+ """Method version of the `~Connection.deferrable` setter."""
async with self.lock:
await self.wait(self._set_deferrable_gen(value))
)
async def tpc_begin(self, xid: Union[Xid, str]) -> None:
+ """
+ Begin a TPC transaction with the given transaction ID `!xid`.
+ """
async with self.lock:
await self.wait(self._tpc_begin_gen(xid))
async def tpc_prepare(self) -> None:
+ """
+ Perform the first phase of a transaction started with `tpc_begin()`.
+ """
try:
async with self.lock:
await self.wait(self._tpc_prepare_gen())
raise e.NotSupportedError(str(ex)) from None
async def tpc_commit(self, xid: Union[Xid, str, None] = None) -> None:
+ """
+ Commit a prepared two-phase transaction.
+ """
async with self.lock:
await self.wait(self._tpc_finish_gen("commit", xid))
async def tpc_rollback(self, xid: Union[Xid, str, None] = None) -> None:
+ """
+ Roll back a prepared two-phase transaction.
+ """
async with self.lock:
await self.wait(self._tpc_finish_gen("rollback", xid))