:sql:`integer`, :sql:`bigint`, :sql:`smallint` according to its value).
- According to the placeholder used (``%s``, ``%b``, ``%t``), Psycopg may
- pick a binary or a text dumper. When using the ``%s`` "`~Format.AUTO`"
+ pick a binary or a text dumper. When using the ``%s`` "`~PyFormat.AUTO`"
format, if the same type has both a text and a binary dumper registered, the
last one registered (using `Dumper.register()`) will be selected.
move to API section
-.. autoclass:: Format
+.. autoclass:: PyFormat
:members:
from . import pq
-class Format(str, Enum):
+class PyFormat(str, Enum):
"""
Enum representing the format wanted for a query argument.
"""Binary parameter (``%b`` placeholder)."""
@classmethod
- def from_pq(cls, fmt: pq.Format) -> "Format":
+ def from_pq(cls, fmt: pq.Format) -> "PyFormat":
return _pg2py[fmt]
@classmethod
- def as_pq(cls, fmt: "Format") -> pq.Format:
+ def as_pq(cls, fmt: "PyFormat") -> pq.Format:
return _py2pg[fmt]
_py2pg = {
- Format.TEXT: pq.Format.TEXT,
- Format.BINARY: pq.Format.BINARY,
+ PyFormat.TEXT: pq.Format.TEXT,
+ PyFormat.BINARY: pq.Format.BINARY,
}
_pg2py = {
- pq.Format.TEXT: Format.TEXT,
- pq.Format.BINARY: Format.BINARY,
+ pq.Format.TEXT: PyFormat.TEXT,
+ pq.Format.BINARY: PyFormat.BINARY,
}
from . import errors as e
from .sql import Composable
from .proto import Query, Params
-from ._enums import Format
+from ._enums import PyFormat
if TYPE_CHECKING:
from .proto import Transformer
class QueryPart(NamedTuple):
pre: bytes
item: Union[int, str]
- format: Format
+ format: PyFormat
class PostgresQuery:
self.types: Tuple[int, ...] = ()
# The format requested by the user and the ones to really pass Postgres
- self._want_formats: Optional[List[Format]] = None
+ self._want_formats: Optional[List[PyFormat]] = None
self.formats: Optional[Sequence[pq.Format]] = None
self._parts: List[QueryPart]
@lru_cache()
def _query2pg(
query: Union[bytes, str], encoding: str
-) -> Tuple[bytes, List[Format], Optional[List[str]], List[QueryPart]]:
+) -> Tuple[bytes, List[PyFormat], Optional[List[str]], List[QueryPart]]:
"""
Convert Python query and params into something Postgres understands.
formats.append(part.format)
elif isinstance(parts[0].item, str):
- seen: Dict[str, Tuple[bytes, Format]] = {}
+ seen: Dict[str, Tuple[bytes, PyFormat]] = {}
order = []
for part in parts[:-1]:
assert isinstance(part.item, str)
pre, m = parts[i]
if m is None:
# last part
- rv.append(QueryPart(pre, 0, Format.AUTO))
+ rv.append(QueryPart(pre, 0, PyFormat.AUTO))
break
ph = m.group(0)
_ph_to_fmt = {
- b"s": Format.AUTO,
- b"t": Format.TEXT,
- b"b": Format.BINARY,
+ b"s": PyFormat.AUTO,
+ b"t": PyFormat.TEXT,
+ b"b": PyFormat.BINARY,
}
from . import errors as e
from .oids import INVALID_OID
from .rows import Row, RowMaker
-from .proto import LoadFunc, AdaptContext
-from ._enums import Format
+from .proto import LoadFunc, AdaptContext, PyFormat
if TYPE_CHECKING:
from .pq.proto import PGresult
self._conn = None
# mapping class, fmt -> Dumper instance
- self._dumpers_cache: DefaultDict[Format, DumperCache] = defaultdict(
+ self._dumpers_cache: DefaultDict[PyFormat, DumperCache] = defaultdict(
dict
)
self._row_loaders = rc
def dump_sequence(
- self, params: Sequence[Any], formats: Sequence[Format]
+ self, params: Sequence[Any], formats: Sequence[PyFormat]
) -> Tuple[List[Any], Tuple[int, ...], Sequence[pq.Format]]:
ps: List[Optional[bytes]] = [None] * len(params)
ts = [INVALID_OID] * len(params)
return ps, tuple(ts), fs
- def get_dumper(self, obj: Any, format: Format) -> "Dumper":
+ def get_dumper(self, obj: Any, format: PyFormat) -> "Dumper":
"""
Return a Dumper instance to dump *obj*.
"""
from . import pq
from . import errors as e
-from ._enums import Format as Format
+from ._enums import PyFormat as PyFormat
from .oids import postgres_types
from .proto import AdaptContext, Buffer as Buffer
from ._cmodule import _psycopg
return b"'%s'" % esc.escape_string(value)
def get_key(
- self, obj: Any, format: Format
+ self, obj: Any, format: PyFormat
) -> Union[type, Tuple[type, ...]]:
"""Return an alternative key to upgrade the dumper to represent *obj*
"""
return self.cls
- def upgrade(self, obj: Any, format: Format) -> "Dumper":
+ def upgrade(self, obj: Any, format: PyFormat) -> "Dumper":
"""Return a new dumper to manage *obj*.
Once `Transformer.get_dumper()` has been notified that this Dumper
is cheap: a copy is made only on customisation.
"""
- _dumpers: Dict[Format, Dict[Union[type, str], Type["proto.Dumper"]]]
+ _dumpers: Dict[PyFormat, Dict[Union[type, str], Type["proto.Dumper"]]]
_loaders: List[Dict[int, Type["Loader"]]]
types: TypesRegistry
template._own_loaders = [False, False]
self.types = TypesRegistry(template.types)
else:
- self._dumpers = {fmt: {} for fmt in Format}
+ self._dumpers = {fmt: {} for fmt in PyFormat}
self._own_dumpers = _dumpers_owned.copy()
self._loaders = [{}, {}]
self._own_loaders = [True, True]
# Register the dumper both as its format and as auto
# so that the last dumper registered is used in auto (%s) format
- for fmt in (Format.from_pq(dumper.format), Format.AUTO):
+ for fmt in (PyFormat.from_pq(dumper.format), PyFormat.AUTO):
if not self._own_dumpers[fmt]:
self._dumpers[fmt] = self._dumpers[fmt].copy()
self._own_dumpers[fmt] = True
self._loaders[fmt][oid] = loader
- def get_dumper(self, cls: type, format: Format) -> Type["proto.Dumper"]:
+ def get_dumper(self, cls: type, format: PyFormat) -> Type["proto.Dumper"]:
"""
Return the dumper class for the given type and format.
raise e.ProgrammingError(
f"cannot adapt type {cls.__name__}"
- f" to format {Format(format).name}"
+ f" to format {PyFormat(format).name}"
)
def get_loader(
return cls
-_dumpers_owned = dict.fromkeys(Format, True)
-_dumpers_shared = dict.fromkeys(Format, False)
+_dumpers_owned = dict.fromkeys(PyFormat, True)
+_dumpers_shared = dict.fromkeys(PyFormat, False)
global_adapters = AdaptersMap(types=postgres_types)
from . import pq
from . import errors as e
from .pq import ExecStatus
-from .adapt import Format
+from .adapt import PyFormat
from .proto import ConnectionType, PQGen, Transformer
from .compat import create_task
from ._cmodule import _psycopg
for item in row:
if item is not None:
- dumper = tx.get_dumper(item, Format.TEXT)
+ dumper = tx.get_dumper(item, PyFormat.TEXT)
b = dumper.dump(item)
out += _dump_re.sub(_dump_sub, b)
else:
out += _pack_int2(len(row))
for item in row:
if item is not None:
- dumper = tx.get_dumper(item, Format.BINARY)
+ dumper = tx.get_dumper(item, PyFormat.BINARY)
b = dumper.dump(item)
out += _pack_int4(len(b))
out += b
from typing import TYPE_CHECKING
from . import pq
-from . import _enums
+from ._enums import PyFormat as PyFormat
from .compat import Protocol
if TYPE_CHECKING:
from .waiting import Wait, Ready
from .connection import BaseConnection
-# NOMERGE: change name of _enums.Format
-PyFormat = _enums.Format
-
# An object implementing the buffer protocol
Buffer = Union[bytes, bytearray, memoryview]
from typing import Any, Iterator, List, Optional, Sequence, Union
from .pq import Escaping
-from .adapt import Transformer, Format
+from .adapt import Transformer, PyFormat
from .proto import AdaptContext
def as_bytes(self, context: Optional[AdaptContext]) -> bytes:
tx = Transformer(context)
- dumper = tx.get_dumper(self._obj, Format.TEXT)
+ dumper = tx.get_dumper(self._obj, PyFormat.TEXT)
return dumper.quote(self._obj)
"""
- def __init__(self, name: str = "", format: Format = Format.AUTO):
+ def __init__(self, name: str = "", format: PyFormat = PyFormat.AUTO):
super().__init__(name)
if not isinstance(name, str):
raise TypeError(f"expected string as name, got {name!r}")
parts = []
if self._obj:
parts.append(repr(self._obj))
- if self._format != Format.AUTO:
- parts.append(f"format={Format(self._format).name}")
+ if self._format != PyFormat.AUTO:
+ parts.append(f"format={PyFormat(self._format).name}")
return f"{self.__class__.__name__}({', '.join(parts)})"
from .. import pq
from .. import errors as e
from ..oids import postgres_types, TEXT_OID, TEXT_ARRAY_OID, INVALID_OID
-from ..adapt import RecursiveDumper, RecursiveLoader, Format as Pg3Format
+from ..adapt import RecursiveDumper, RecursiveLoader, PyFormat
from ..proto import Dumper, AdaptContext, Buffer
from .._struct import pack_len, unpack_len
from .._typeinfo import TypeInfo
self.sub_dumper: Optional[Dumper] = None
self._types = context.adapters.types if context else postgres_types
- def get_key(self, obj: List[Any], format: Pg3Format) -> Tuple[type, ...]:
+ def get_key(self, obj: List[Any], format: PyFormat) -> Tuple[type, ...]:
item = self._find_list_element(obj)
if item is not None:
sd = self._tx.get_dumper(item, format)
else:
return (self.cls,)
- def upgrade(self, obj: List[Any], format: Pg3Format) -> "BaseListDumper":
+ def upgrade(self, obj: List[Any], format: PyFormat) -> "BaseListDumper":
item = self._find_list_element(obj)
if item is None:
# Empty lists can only be dumped as text if the type is unknown.
from .. import pq
from ..oids import TEXT_OID
-from ..adapt import Format, RecursiveDumper, RecursiveLoader
+from ..adapt import PyFormat, RecursiveDumper, RecursiveLoader
from ..proto import AdaptContext, Buffer
from .._struct import unpack_len
from .._typeinfo import CompositeInfo as CompositeInfo # exported here
parts.append(sep)
continue
- dumper = self._tx.get_dumper(item, Format.from_pq(self.format))
+ dumper = self._tx.get_dumper(item, PyFormat.from_pq(self.format))
ad = dumper.dump(item)
if not ad:
ad = b'""'
from ..pq import Format
from .._tz import get_tzinfo
from ..oids import postgres_types as builtins
-from ..adapt import Buffer, Dumper, Loader, Format as Pg3Format
+from ..adapt import Buffer, Dumper, Loader, PyFormat
from ..proto import AdaptContext
from ..errors import InterfaceError, DataError
from .._struct import pack_int4, pack_int8, unpack_int4, unpack_int8
class _BaseTimeDumper(Dumper):
- def get_key(
- self, obj: time, format: Pg3Format
- ) -> Union[type, Tuple[type]]:
+ def get_key(self, obj: time, format: PyFormat) -> Union[type, Tuple[type]]:
# Use (cls,) to report the need to upgrade to a dumper for timetz (the
# Frankenstein of the data types).
if not obj.tzinfo:
else:
return (self.cls,)
- def upgrade(self, obj: time, format: Pg3Format) -> Dumper:
+ def upgrade(self, obj: time, format: PyFormat) -> Dumper:
raise NotImplementedError
_oid = builtins["time"].oid
- def upgrade(self, obj: time, format: Pg3Format) -> Dumper:
+ def upgrade(self, obj: time, format: PyFormat) -> Dumper:
if not obj.tzinfo:
return self
else:
)
return pack_int8(us)
- def upgrade(self, obj: time, format: Pg3Format) -> Dumper:
+ def upgrade(self, obj: time, format: PyFormat) -> Dumper:
if not obj.tzinfo:
return self
else:
class _BaseDatetimeDumper(Dumper):
def get_key(
- self, obj: datetime, format: Pg3Format
+ self, obj: datetime, format: PyFormat
) -> Union[type, Tuple[type]]:
# Use (cls,) to report the need to upgrade (downgrade, actually) to a
# dumper for naive timestamp.
else:
return (self.cls,)
- def upgrade(self, obj: datetime, format: Pg3Format) -> Dumper:
+ def upgrade(self, obj: datetime, format: PyFormat) -> Dumper:
raise NotImplementedError
_oid = builtins["timestamptz"].oid
- def upgrade(self, obj: datetime, format: Pg3Format) -> Dumper:
+ def upgrade(self, obj: datetime, format: PyFormat) -> Dumper:
if obj.tzinfo:
return self
else:
)
return pack_int8(micros)
- def upgrade(self, obj: datetime, format: Pg3Format) -> Dumper:
+ def upgrade(self, obj: datetime, format: PyFormat) -> Dumper:
if obj.tzinfo:
return self
else:
from .. import errors as e
from ..pq import Format
from ..oids import postgres_types as builtins
-from ..adapt import Buffer, Dumper, Loader
-from ..adapt import Format as Pg3Format
+from ..adapt import Buffer, Dumper, Loader, PyFormat
from ..proto import AdaptContext
from .._struct import pack_int2, pack_uint2, unpack_int2
from .._struct import pack_int4, pack_uint4, unpack_int4, unpack_uint4
" dump() is not supposed to be called"
)
- def get_key(self, obj: int, format: Pg3Format) -> type:
+ def get_key(self, obj: int, format: PyFormat) -> type:
return self.upgrade(obj, format).cls
_int2_dumper = Int2Dumper(Int2)
_int8_dumper = Int8Dumper(Int8)
_int_numeric_dumper = IntNumericDumper(IntNumeric)
- def upgrade(self, obj: int, format: Pg3Format) -> Dumper:
+ def upgrade(self, obj: int, format: PyFormat) -> Dumper:
if -(2 ** 31) <= obj < 2 ** 31:
if -(2 ** 15) <= obj < 2 ** 15:
return self._int2_dumper
from ..pq import Format
from ..oids import postgres_types as builtins, INVALID_OID
-from ..adapt import RecursiveDumper, RecursiveLoader, Format as Pg3Format
+from ..adapt import RecursiveDumper, RecursiveLoader, PyFormat
from ..proto import Dumper, AdaptContext, Buffer
from .._struct import pack_len, unpack_len
from .._typeinfo import RangeInfo as RangeInfo # exported here
super().__init__(cls, context)
self.sub_dumper: Optional[Dumper] = None
self._types = context.adapters.types if context else builtins
- self._adapt_format = Pg3Format.from_pq(self.format)
+ self._adapt_format = PyFormat.from_pq(self.format)
def get_key(
- self, obj: Range[Any], format: Pg3Format
+ self, obj: Range[Any], format: PyFormat
) -> Union[type, Tuple[type, ...]]:
# If we are a subclass whose oid is specified we don't need upgrade
if self.oid != INVALID_OID:
else:
return (self.cls,)
- def upgrade(self, obj: Range[Any], format: Pg3Format) -> "BaseRangeDumper":
+ def upgrade(self, obj: Range[Any], format: PyFormat) -> "BaseRangeDumper":
# If we are a subclass whose oid is specified we don't need upgrade
if self.oid != INVALID_OID:
return self
if type(item) is int:
# postgres won't cast int4range -> int8range so we must use
# text format and unknown oid here
- sd = self._tx.get_dumper(item, Pg3Format.TEXT)
+ sd = self._tx.get_dumper(item, PyFormat.TEXT)
dumper = RangeDumper(self.cls, self._tx)
dumper.sub_dumper = sd
dumper.oid = INVALID_OID
from psycopg import pq
from psycopg import proto
from psycopg.rows import Row, RowMaker
-from psycopg.adapt import Loader, AdaptersMap, Format
+from psycopg.adapt import Loader, AdaptersMap, PyFormat
from psycopg.proto import Dumper
from psycopg.pq.proto import PGconn, PGresult
from psycopg.connection import BaseConnection
self, types: Sequence[int], formats: Sequence[pq.Format]
) -> None: ...
def dump_sequence(
- self, params: Sequence[Any], formats: Sequence[Format]
+ self, params: Sequence[Any], formats: Sequence[PyFormat]
) -> Tuple[List[Any], Tuple[int, ...], Sequence[pq.Format]]: ...
- def get_dumper(self, obj: Any, format: Format) -> Dumper: ...
+ def get_dumper(self, obj: Any, format: PyFormat) -> Dumper: ...
def load_rows(
self, row0: int, row1: int, make_row: RowMaker[Row]
) -> List[Row]: ...
from psycopg_c._psycopg cimport oids
from psycopg.pq import Format as _pq_Format
-from psycopg._enums import Format as _pg_Format
+from psycopg._enums import PyFormat as _py_Format
PQ_TEXT = _pq_Format.TEXT
PQ_BINARY = _pq_Format.BINARY
-PG_AUTO = _pg_Format.AUTO
-PG_TEXT = _pg_Format.TEXT
-PG_BINARY = _pg_Format.BINARY
+PG_AUTO = _py_Format.AUTO
+PG_TEXT = _py_Format.TEXT
+PG_BINARY = _py_Format.BINARY
include "_psycopg/adapt.pyx"
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from psycopg import errors as e
-from psycopg._enums import Format as Pg3Format
from psycopg.pq import Format as PqFormat
from psycopg.rows import Row, RowMaker
import psycopg
from psycopg import sql
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat
from psycopg.types.range import Range
from psycopg.types.numeric import Int4, Int8
def __init__(self, connection):
self.conn = connection
- self._format = Format.BINARY
+ self._format = PyFormat.BINARY
self.records = []
self._schema = None
(dt.datetime, False),
]
# TODO: learn to dump numeric ranges in binary
- if self.format != Format.BINARY:
+ if self.format != PyFormat.BINARY:
subtypes.extend([Int4, Int8])
return (cls, choice(subtypes))
def make_Range(self, spec):
# TODO: drop format check after fixing binary dumping of empty ranges
- if random() < 0.02 and spec[0] is Range and self.format == Format.TEXT:
+ if (
+ random() < 0.02
+ and spec[0] is Range
+ and self.format == PyFormat.TEXT
+ ):
return spec[0](empty=True)
while True:
# avoid generating ranges with no type info if dumping in binary
# TODO: lift this limitation after test_copy_in_empty xfail is fixed
- if spec[0] is Range and self.format == Format.BINARY:
+ if spec[0] is Range and self.format == PyFormat.BINARY:
if bounds[0] is bounds[1] is None:
continue
import psycopg
from psycopg import pq, sql
-from psycopg.adapt import Transformer, Format, Dumper, Loader
+from psycopg.adapt import Transformer, PyFormat as Format, Dumper, Loader
from psycopg.oids import postgres_types as builtins, TEXT_OID
from psycopg._cmodule import _psycopg
from psycopg import sql
from psycopg import errors as e
from psycopg.pq import Format
-from psycopg.adapt import Format as PgFormat
+from psycopg.adapt import PyFormat as PgFormat
from psycopg.types.numeric import Int4
from .utils import gc_collect
from psycopg import sql
from psycopg import errors as e
from psycopg.pq import Format
-from psycopg.adapt import Format as PgFormat
+from psycopg.adapt import PyFormat as PgFormat
from .utils import gc_collect
from .test_copy import sample_text, sample_binary, sample_binary_rows # noqa
import psycopg
from psycopg import pq, sql, rows
from psycopg.oids import postgres_types as builtins
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
from .utils import gc_collect
import psycopg
from psycopg import pq, sql, rows
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
from .utils import gc_collect
from .test_cursor import my_row_factory
import psycopg
from psycopg import pq
-from psycopg.adapt import Transformer, Format
+from psycopg.adapt import Transformer, PyFormat as Format
from psycopg._queries import PostgresQuery, _split_query
import pytest
from psycopg import sql, ProgrammingError
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
@pytest.mark.parametrize(
from psycopg import pq
from psycopg import sql
from psycopg.oids import postgres_types as builtins
-from psycopg.adapt import Format, Transformer
+from psycopg.adapt import PyFormat as Format, Transformer
from psycopg.types import TypeInfo
from psycopg import pq
from psycopg import sql
from psycopg.oids import postgres_types as builtins
-from psycopg.adapt import Transformer, Format
+from psycopg.adapt import Transformer, PyFormat as Format
@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
from psycopg import pq
from psycopg.sql import Identifier
from psycopg.oids import postgres_types as builtins
-from psycopg.adapt import Format, global_adapters
+from psycopg.adapt import PyFormat as Format, global_adapters
from psycopg.types.composite import CompositeInfo
import pytest
from psycopg import DataError, pq, sql
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
class TestDate:
import psycopg.types
from psycopg import pq
from psycopg import sql
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
from psycopg.types.json import Json, Jsonb, set_json_dumps, set_json_loads
samples = [
from psycopg import pq
from psycopg import sql
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
from psycopg import sql
-from psycopg.adapt import Transformer, Format
+from psycopg.adapt import Transformer, PyFormat as Format
def test_quote_none(conn):
from psycopg import pq
from psycopg import sql
-from psycopg.adapt import Transformer, Format
+from psycopg.adapt import Transformer, PyFormat as Format
from psycopg.types.numeric import FloatLoader
import psycopg.errors
from psycopg import pq
from psycopg.sql import Identifier
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
from psycopg.types.range import Range, RangeInfo
import psycopg
from psycopg import pq
from psycopg import sql
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
from psycopg import Binary
eur = "\u20ac"
from psycopg import pq
from psycopg import sql
-from psycopg.adapt import Format
+from psycopg.adapt import PyFormat as Format
@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
from typing import Any, Callable, Optional, Sequence, Tuple
from psycopg import AnyCursor, Connection, Cursor, ServerCursor, connect
-from psycopg import pq, adapt
-from psycopg.proto import Dumper, AdaptContext
+from psycopg import pq
+from psycopg.proto import Dumper, AdaptContext, PyFormat
def int_row_factory(cursor: AnyCursor[int]) -> Callable[[Sequence[int]], int]:
esc = pq.Escaping()
return b"'%s'" % esc.escape_string(value.replace(b"h", b"q"))
- def get_key(self, obj: str, format: adapt.Format) -> type:
+ def get_key(self, obj: str, format: PyFormat) -> type:
return self.cls
- def upgrade(self, obj: str, format: adapt.Format) -> "MyStrDumper":
+ def upgrade(self, obj: str, format: PyFormat) -> "MyStrDumper":
return self