import logging
import threading
from types import TracebackType
-from typing import Any, cast, Dict, Generator, Iterator
-from typing import List, Optional, Type, TypeVar, Union
-from typing import overload, TYPE_CHECKING
+from typing import Any, Generator, Iterator, Dict, List, Optional
+from typing import Type, TypeVar, Union, cast, overload, TYPE_CHECKING
from contextlib import contextmanager
from . import pq
from . import errors as e
from . import waiting
-from .abc import AdaptContext, Params, Query, RV
-from .abc import PQGen, PQGenConn
+from .abc import AdaptContext, Params, PQGen, PQGenConn, Query, RV
from ._tpc import Xid
from .rows import Row, RowFactory, tuple_row, TupleRow, args_row
from .adapt import AdaptersMap
from .cursor import Cursor
from .conninfo import make_conninfo, conninfo_to_dict
from ._pipeline import Pipeline
-from .generators import notifies
from ._encodings import pgconn_encoding
+from .generators import notifies
from .transaction import Transaction
from .server_cursor import ServerCursor
from ._connection_base import BaseConnection, CursorRow, Notify
"""
Connect to a database server and return a new `Connection` instance.
"""
+
params = cls._get_connection_params(conninfo, **kwargs)
conninfo = make_conninfo(**params)
try:
self.rollback()
except Exception as exc2:
- logger.warning(
- "error ignored in rollback on %s: %s",
- self,
- exc2,
- )
+ logger.warning("error ignored in rollback on %s: %s", self, exc2)
else:
self.commit()
withhold: bool = False,
) -> Union[Cursor[Any], ServerCursor[Any]]:
"""
- Return a new cursor to send commands and queries to the connection.
+ Return a new `Cursor` to send commands and queries to the connection.
"""
self._check_connection_ok()
cur.format = BINARY
return cur.execute(query, params, prepare=prepare)
-
except e._NO_TRACEBACK as ex:
raise ex.with_traceback(None)
@contextmanager
def transaction(
- self,
- savepoint_name: Optional[str] = None,
- force_rollback: bool = False,
+ self, savepoint_name: Optional[str] = None, force_rollback: bool = False
) -> Iterator[Transaction]:
"""
Start a context block with a new transaction or nested transaction.
@contextmanager
def pipeline(self) -> Iterator[Pipeline]:
- """Switch the connection into pipeline mode."""
+ """Context manager to switch the connection into pipeline mode."""
with self.lock:
self._check_connection_ok()
try:
await self.rollback()
except Exception as exc2:
- logger.warning(
- "error ignored in rollback on %s: %s",
- self,
- exc2,
- )
+ logger.warning("error ignored in rollback on %s: %s", self, exc2)
else:
await self.commit()
@asynccontextmanager
async def transaction(
- self,
- savepoint_name: Optional[str] = None,
- force_rollback: bool = False,
+ self, savepoint_name: Optional[str] = None, force_rollback: bool = False
) -> AsyncIterator[AsyncTransaction]:
"""
Start a context block with a new transaction or nested transaction.
Commit a prepared two-phase transaction.
"""
async with self.lock:
- await self.wait(self._tpc_finish_gen("commit", xid))
+ await self.wait(self._tpc_finish_gen("COMMIT", xid))
async def tpc_rollback(self, xid: Union[Xid, str, None] = None) -> None:
"""
Roll back a prepared two-phase transaction.
"""
async with self.lock:
- await self.wait(self._tpc_finish_gen("rollback", xid))
+ await self.wait(self._tpc_finish_gen("ROLLBACK", xid))
async def tpc_recover(self) -> List[Xid]:
self._check_tpc()