OidDumperCache: TypeAlias = Dict[int, "Dumper"]
LoaderCache: TypeAlias = Dict[int, "Loader"]
+TEXT = pq.Format.TEXT
+
class Transformer(AdaptContext):
"""
return out
types = [INVALID_OID] * nparams
- pqformats = [pq.Format.TEXT] * nparams
+ pqformats = [TEXT] * nparams
for i in range(nparams):
param = params[i]
from ._queries import PostgresQuery, PostgresClientQuery
+from . import pq
from . import adapt
from . import errors as e
-from .pq import Format
from .abc import ConnectionType, Query, Params
from .rows import Row
from .cursor import BaseCursor, Cursor
from .connection import Connection # noqa: F401
from .connection_async import AsyncConnection # noqa: F401
+TEXT = pq.Format.TEXT
+BINARY = pq.Format.BINARY
+
class ClientCursorMixin(BaseCursor[ConnectionType, Row]):
def mogrify(self, query: Query, params: Optional[Params] = None) -> str:
if binary is None:
fmt = self.format
else:
- fmt = Format.BINARY if binary else Format.TEXT
+ fmt = BINARY if binary else TEXT
- if fmt == Format.BINARY:
+ if fmt == BINARY:
raise e.NotSupportedError(
"client-side cursors don't support binary results"
)
from . import errors as e
from . import waiting
from . import postgres
-from .pq import ConnStatus, ExecStatus, TransactionStatus, Format
+from .pq import ConnStatus, ExecStatus, TransactionStatus
from .abc import AdaptContext, ConnectionType, Params, Query, RV
from .abc import PQGen, PQGenConn
from .sql import Composable, SQL
from .pq.abc import PGconn, PGresult
from psycopg_pool.base import BasePool
-logger = logging.getLogger("psycopg")
-
-# Row Type variable for Cursor (when it needs to be distinguished from the
-# connection's one)
-CursorRow = TypeVar("CursorRow")
-
if _psycopg:
connect = _psycopg.connect
execute = _psycopg.execute
connect = generators.connect
execute = generators.execute
+logger = logging.getLogger("psycopg")
+
+# Row Type variable for Cursor (when it needs to be distinguished from the
+# connection's one)
+CursorRow = TypeVar("CursorRow")
+
+TEXT = pq.Format.TEXT
+BINARY = pq.Format.BINARY
+
class Notify(NamedTuple):
"""An asynchronous notification received from the database."""
return conn
def _exec_command(
- self, command: Query, result_format: Format = Format.TEXT
+ self, command: Query, result_format: pq.Format = TEXT
) -> PQGen[Optional["PGresult"]]:
"""
Generator to send a command and receive the result to the backend.
command = command.as_bytes(self)
if self._pipeline:
- if result_format == Format.TEXT:
+ if result_format == TEXT:
cmd = partial(self.pgconn.send_query, command)
else:
cmd = partial(
self._pipeline.result_queue.append(None)
return None
- if result_format == Format.TEXT:
+ if result_format == TEXT:
self.pgconn.send_query(command)
else:
self.pgconn.send_query_params(command, None, result_format=result_format)
cur = self.cursor_factory(self, row_factory=row_factory)
if binary:
- cur.format = Format.BINARY
+ cur.format = BINARY
return cur
try:
cur = self.cursor()
if binary:
- cur.format = Format.BINARY
+ cur.format = BINARY
return cur.execute(query, params, prepare=prepare)
from typing import Type, Union, cast, overload, TYPE_CHECKING
from contextlib import asynccontextmanager
+from . import pq
from . import errors as e
from . import waiting
-from .pq import Format, TransactionStatus
+from .pq import TransactionStatus
from .abc import AdaptContext, Params, PQGen, PQGenConn, Query, RV
from ._tpc import Xid
from .rows import Row, AsyncRowFactory, tuple_row, TupleRow, args_row
if TYPE_CHECKING:
from .pq.abc import PGconn
+TEXT = pq.Format.TEXT
+BINARY = pq.Format.BINARY
logger = logging.getLogger("psycopg")
cur = self.cursor_factory(self, row_factory=row_factory)
if binary:
- cur.format = Format.BINARY
+ cur.format = BINARY
return cur
try:
cur = self.cursor()
if binary:
- cur.format = Format.BINARY
+ cur.format = BINARY
return await cur.execute(query, params, prepare=prepare)
TEXT = pq.Format.TEXT
BINARY = pq.Format.BINARY
+PY_TEXT = PyFormat.TEXT
+PY_BINARY = PyFormat.BINARY
class BaseCopy(Generic[ConnectionType]):
assert tx.pgresult, "The Transformer doesn't have a PGresult set"
self._pgresult: "PGresult" = tx.pgresult
- if self._pgresult.binary_tuples == pq.Format.TEXT:
+ if self._pgresult.binary_tuples == TEXT:
self.formatter = TextFormatter(tx, encoding=pgconn_encoding(self._pgconn))
else:
self.formatter = BinaryFormatter(tx)
class TextFormatter(Formatter):
- format = pq.Format.TEXT
+ format = TEXT
def __init__(self, transformer: Transformer, encoding: str = "utf-8"):
super().__init__(transformer)
class BinaryFormatter(Formatter):
- format = pq.Format.BINARY
+ format = BINARY
def __init__(self, transformer: Transformer):
super().__init__(transformer)
for item in row:
if item is not None:
- dumper = tx.get_dumper(item, PyFormat.TEXT)
+ dumper = tx.get_dumper(item, PY_TEXT)
b = dumper.dump(item)
out += _dump_re.sub(_dump_sub, b)
else:
out = bytearray()
out += _pack_int2(len(row))
- adapted = tx.dump_sequence(row, [PyFormat.BINARY] * len(row))
+ adapted = tx.dump_sequence(row, [PY_BINARY] * len(row))
for b in adapted:
if b is not None:
out += _pack_int4(len(b))
from . import pq
from . import adapt
from . import errors as e
-from .pq import ExecStatus, Format
+from .pq import ExecStatus
from .abc import ConnectionType, Query, Params, PQGen
from .copy import Copy
from .rows import Row, RowMaker, RowFactory
_C = TypeVar("_C", bound="Cursor[Any]")
+TEXT = pq.Format.TEXT
+BINARY = pq.Format.BINARY
+
class BaseCursor(Generic[ConnectionType, Row]):
__slots__ = """
def __init__(self, connection: ConnectionType):
self._conn = connection
- self.format = Format.TEXT
+ self.format = TEXT
self._pgconn = connection.pgconn
self._adapters = adapt.AdaptersMap(connection.adapters)
self.arraysize = 1
if binary is None:
fmt = self.format
else:
- fmt = Format.BINARY if binary else Format.TEXT
+ fmt = BINARY if binary else TEXT
self._query = query
- if query.params or no_pqexec or fmt == Format.BINARY:
+ if query.params or no_pqexec or fmt == BINARY:
if self._conn._pipeline:
self._conn._pipeline.command_queue.append(
partial(
f"unexpected result status from query: {ExecStatus(result.status).name}"
)
- def _set_current_result(self, i: int, format: Optional[Format] = None) -> None:
+ def _set_current_result(self, i: int, format: Optional[pq.Format] = None) -> None:
"""
Select one of the results in the cursor as the active one.
"""
if binary is None:
fmt = self.format
else:
- fmt = Format.BINARY if binary else Format.TEXT
+ fmt = BINARY if binary else TEXT
if self._conn._pipeline:
self._conn._pipeline.command_queue.append(
DEFAULT_ITERSIZE = 100
+TEXT = pq.Format.TEXT
+BINARY = pq.Format.BINARY
+
class ServerCursorMixin(BaseCursor[ConnectionType, Row]):
"""Mixin to add ServerCursor behaviour and implementation a BaseCursor."""
self._withhold = withhold
self._described = False
self.itersize: int = DEFAULT_ITERSIZE
- self._format = pq.Format.TEXT
+ self._format = TEXT
def __repr__(self) -> str:
# Insert the name as the second word
if binary is None:
self._format = self.format
else:
- self._format = pq.Format.BINARY if binary else pq.Format.TEXT
+ self._format = BINARY if binary else TEXT
# The above result only returned COMMAND_OK. Get the cursor shape
yield from self._describe_gen()