A format can be enforced using %b or %t placeholders.
Use binary formats dumpers if available, otherwise use text dumpers.
Python str are an exception: prefer text dumper, because it will dump
with format 0, so Postgres can cast the value to any format.
Special-case empty lists to be always dumped as textual unknown '{}' so
that Postgres can cast them into whatever array it fancies.
:type context: `~psycopg3.Connection`, `~psycopg3.Cursor`, or `Transformer`
.. attribute:: format
- :type: Format
+ :type: pq.Format
The format this class dumps, `~Format.TEXT` or `~Format.BINARY`.
This is a class attribute.
.. seealso:: :pq:`PQresultStatus` for a description of these states.
+.. autoclass:: Format
+ :members:
+
+
.. autoclass:: DiagnosticField
Available attributes:
--- /dev/null
+"""
+Enum values for psycopg3
+
+These values are defined by us and are not necessarily dependent on
+libpq-defined enums.
+"""
+
+# Copyright (C) 2020-2021 The Psycopg Team
+
+from enum import Enum
+
+from . import pq
+
+
+class Format(str, Enum):
+ """
+ Enum representing the format wanted for a query argument.
+
+ The value `AUTO` allows psycopg3 to choose the best value for a certain
+ value.
+ """
+
+ __module__ = "psycopg3.adapt"
+
+ AUTO = "s"
+ """Automatically chosen (``%s`` placeholder)."""
+ TEXT = "t"
+ """Text parameter (``%t`` placeholder)."""
+ BINARY = "b"
+ """Binary parameter (``%b`` placeholder)."""
+
+ @classmethod
+ def from_pq(cls, fmt: pq.Format) -> "Format":
+ return _pg2py[fmt]
+
+ @classmethod
+ def as_pq(cls, fmt: "Format") -> pq.Format:
+ return _py2pg[fmt]
+
+
+_py2pg = {
+ Format.TEXT: pq.Format.TEXT,
+ Format.BINARY: pq.Format.BINARY,
+}
+
+_pg2py = {
+ pq.Format.TEXT: Format.TEXT,
+ pq.Format.BINARY: Format.BINARY,
+}
from typing import Sequence, Tuple, Union, TYPE_CHECKING
from functools import lru_cache
+from . import pq
from . import errors as e
-from .pq import Format
from .sql import Composable
from .proto import Query, Params
+from ._enums import Format
if TYPE_CHECKING:
from .proto import Transformer
# The format requested by the user and the ones to really pass Postgres
self._want_formats: Optional[List[Format]] = None
- self.formats: Optional[Sequence[Format]] = None
+ self.formats: Optional[Sequence[pq.Format]] = None
self._parts: List[QueryPart]
self.query = b""
pre, m = parts[i]
if m is None:
# last part
- rv.append(QueryPart(pre, 0, Format.TEXT))
+ rv.append(QueryPart(pre, 0, Format.AUTO))
break
ph = m.group(0)
"incomplete placeholder: '%'; if you want to use '%' as an"
" operator you can double it up, i.e. use '%%'"
)
- elif ph[-1:] not in b"bs":
+ elif ph[-1:] not in b"sbt":
raise e.ProgrammingError(
- f"only '%s' and '%b' placeholders allowed, got"
+ f"only '%s', '%b', '%t' placeholders allowed, got"
f" {m.group(0).decode(encoding)}"
)
"positional and named placeholders cannot be mixed"
)
- # Binary format
- format = Format(ph[-1:] == b"b")
-
+ format = _ph_to_fmt[ph[-1:]]
rv.append(QueryPart(pre, item, format))
i += 1
return rv
+
+
+_ph_to_fmt = {
+ b"s": Format.AUTO,
+ b"t": Format.TEXT,
+ b"b": Format.BINARY,
+}
# Copyright (C) 2020 The Psycopg Team
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, Union
-from typing import cast, TYPE_CHECKING
+from typing import cast, DefaultDict, TYPE_CHECKING
+from collections import defaultdict
+from . import pq
from . import errors as e
-from .pq import Format
from .oids import INVALID_OID
from .proto import LoadFunc, AdaptContext
+from ._enums import Format
if TYPE_CHECKING:
from .pq.proto import PGresult
self._connection = None
# mapping class, fmt -> Dumper instance
- self._dumpers_cache: Tuple[DumperCache, DumperCache] = ({}, {})
+ self._dumpers_cache: DefaultDict[Format, DumperCache] = defaultdict(
+ dict
+ )
# mapping oid, fmt -> Loader instance
self._loaders_cache: Tuple[LoaderCache, LoaderCache] = ({}, {})
rc.append(self.get_loader(oid, fmt).load) # type: ignore
def set_row_types(
- self, types: Sequence[int], formats: Sequence[Format]
+ self, types: Sequence[int], formats: Sequence[pq.Format]
) -> None:
rc: List[LoadFunc] = [None] * len(types) # type: ignore[list-item]
for i in range(len(rc)):
def dump_sequence(
self, params: Sequence[Any], formats: Sequence[Format]
- ) -> Tuple[List[Any], Tuple[int, ...], Sequence[Format]]:
+ ) -> Tuple[List[Any], Tuple[int, ...], Sequence[pq.Format]]:
ps: List[Optional[bytes]] = [None] * len(params)
ts = [INVALID_OID] * len(params)
+ fs: List[pq.Format] = [pq.Format.TEXT] * len(params)
dumpers = self._row_dumpers
if not dumpers:
dumper = dumpers[i] = self.get_dumper(param, formats[i])
ps[i] = dumper.dump(param)
ts[i] = dumper.oid
+ fs[i] = dumper.format
- return ps, tuple(ts), formats
+ return ps, tuple(ts), fs
def get_dumper(self, obj: Any, format: Format) -> "Dumper":
# Fast path: return a Dumper class already instantiated from the same type
subobj = self._find_list_element(obj)
key = (cls, type(subobj))
+ cache = self._dumpers_cache[format]
try:
- return self._dumpers_cache[format][key]
+ return cache[key]
except KeyError:
pass
+ # When dumping a string with %s we may refer to any type actually,
+ # but the user surely passed a text format
+ if cls is str and format == Format.AUTO:
+ format = Format.TEXT
+
+ sub_dumper = None
+ if cls is list:
+ # It's not possible to declare an empty unknown array, so force text
+ if subobj is None:
+ format = Format.TEXT
+
+ # If we are dumping a list it's the sub-object which should dictate
+ # what format to use.
+ else:
+ sub_dumper = self.get_dumper(subobj, format)
+ format = Format.from_pq(sub_dumper.format)
+
dcls = self._adapters.get_dumper(cls, format)
if not dcls:
raise e.ProgrammingError(
)
d = dcls(cls, self)
- if cls is list:
- if subobj is not None:
- sub_dumper = self.get_dumper(subobj, format)
- cast("BaseListDumper", d).set_sub_dumper(sub_dumper)
- elif format == Format.TEXT:
- # Special case dumping an empty list (or containing no None
- # element). In text mode we cast them as unknown, so that
- # postgres can cast them automatically to something useful.
- # In binary we cannot do it, it doesn't seem there is a
- # representation for unknown array, so let's dump it as text[].
- # This means that placeholders receiving a binary array should
- # be almost always cast to the target type.
- d.oid = INVALID_OID
-
- self._dumpers_cache[format][key] = d
+ if sub_dumper:
+ cast("BaseListDumper", d).set_sub_dumper(sub_dumper)
+
+ cache[key] = d
return d
def load_rows(self, row0: int, row1: int) -> List[Tuple[Any, ...]]:
for i, val in enumerate(record)
)
- def get_loader(self, oid: int, format: Format) -> "Loader":
+ def get_loader(self, oid: int, format: pq.Format) -> "Loader":
try:
return self._loaders_cache[format][oid]
except KeyError:
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Type, TypeVar, Union
from typing import cast, TYPE_CHECKING
+
from . import pq
from . import proto
-from .pq import Format as Format
+from ._enums import Format as Format
from .oids import builtins
from .proto import AdaptContext
Convert Python object of the type *cls* to PostgreSQL representation.
"""
- format: Format
+ format: pq.Format
# A class-wide oid, which will be used by default by instances unless
# the subclass overrides it in init.
Convert PostgreSQL objects with OID *oid* to Python objects.
"""
- format: Format
+ format: pq.Format
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
self.oid = oid
Return None if not found.
"""
- dumpers = self._dumpers[format]
+ # TODO: auto selection
+ if format == Format.AUTO:
+ dmaps = [
+ self._dumpers[pq.Format.BINARY],
+ self._dumpers[pq.Format.TEXT],
+ ]
+ elif format == Format.BINARY:
+ dmaps = [self._dumpers[pq.Format.BINARY]]
+ elif format == Format.TEXT:
+ dmaps = [self._dumpers[pq.Format.TEXT]]
+ else:
+ raise ValueError(f"bad dumper format: {format}")
# Look for the right class, including looking at superclasses
for scls in cls.__mro__:
- if scls in dumpers:
- return dumpers[scls]
+ for dmap in dmaps:
+ if scls in dmap:
+ return dmap[scls]
# If the adapter is not found, look for its name as a string
fqn = scls.__module__ + "." + scls.__qualname__
- if fqn in dumpers:
- # Replace the class name with the class itself
- d = dumpers[scls] = dumpers.pop(fqn)
- return d
+ for dmap in dmaps:
+ if fqn in dmap:
+ # Replace the class name with the class itself
+ d = dmap[scls] = dmap.pop(fqn)
+ return d
return None
- def get_loader(self, oid: int, format: Format) -> Optional[Type[Loader]]:
+ def get_loader(
+ self, oid: int, format: pq.Format
+ ) -> Optional[Type[Loader]]:
"""
Return the loader class for the given oid and format.
from . import pq
from . import errors as e
-from .pq import Format, ExecStatus
+from .pq import ExecStatus
+from .adapt import Format
from .proto import ConnectionType, PQGen, Transformer
from .generators import copy_from, copy_to, copy_end
from .cursor import BaseCursor # noqa: F401
from .connection import Connection, AsyncConnection # noqa: F401
+TEXT = pq.Format.TEXT
+BINARY = pq.Format.BINARY
+
class BaseCopy(Generic[ConnectionType]):
def __init__(self, cursor: "BaseCursor[ConnectionType]"):
), "The Transformer doesn't have a PGresult set"
self._pgresult: "PGresult" = self.transformer.pgresult
- self.format = Format(self._pgresult.binary_tuples)
+ self.format = pq.Format(self._pgresult.binary_tuples)
self._encoding = self.connection.client_encoding
self._signature_sent = False
self._row_mode = False # true if the user is using send_row()
self._write_buffer_size = 32 * 1024
self._finished = False
- if self.format == Format.TEXT:
+ if self.format == TEXT:
self._format_row = format_row_text
self._parse_row = parse_row_text
else:
if not data:
return None
- if self.format == Format.BINARY:
+ if self.format == BINARY:
if not self._signature_sent:
if data[: len(_binary_signature)] != _binary_signature:
raise e.DataError(
# to take care of the end-of-copy marker too
self._row_mode = True
- if self.format == Format.BINARY and not self._signature_sent:
+ if self.format == BINARY and not self._signature_sent:
self._write_buffer += _binary_signature
self._signature_sent = True
)
return
- if self.format == Format.BINARY:
+ if self.format == BINARY:
# If we have sent no data we need to send the signature
# and the trailer
if not self._signature_sent:
return data
elif isinstance(data, str):
- if self._pgresult.binary_tuples == Format.BINARY:
+ if self._pgresult.binary_tuples == BINARY:
raise TypeError(
"cannot copy str data in binary mode: use bytes instead"
)
from math import floor
from typing import Any, Sequence
+from .pq import Format
from .oids import builtins
-from .adapt import Dumper, Format
+from .adapt import Dumper
class DBAPITypeObject:
class Format(IntEnum):
"""
Enum representing the format of a query argument or return value.
+
+ These values are only the ones managed by the libpq. `~psycopg3` may also
+ support automatically-chosen values: see `psycopg3.adapt.Format`.
"""
- __module__ = "psycopg3.adapt"
+ __module__ = "psycopg3.pq"
TEXT = 0
"""Text parameter."""
from typing_extensions import Protocol
from . import pq
-from .pq import Format
+from ._enums import Format
if TYPE_CHECKING:
from .connection import BaseConnection
...
def set_row_types(
- self, types: Sequence[int], formats: Sequence[Format]
+ self, types: Sequence[int], formats: Sequence[pq.Format]
) -> None:
...
def dump_sequence(
self, params: Sequence[Any], formats: Sequence[Format]
- ) -> Tuple[List[Any], Tuple[int, ...], Sequence[Format]]:
+ ) -> Tuple[List[Any], Tuple[int, ...], Sequence[pq.Format]]:
...
def get_dumper(self, obj: Any, format: Format) -> "Dumper":
) -> Tuple[Any, ...]:
...
- def get_loader(self, oid: int, format: Format) -> "Loader":
+ def get_loader(self, oid: int, format: pq.Format) -> "Loader":
...
import string
from typing import Any, Iterator, List, Optional, Sequence, Union
-from .pq import Escaping, Format
-from .adapt import Transformer
+from .pq import Escaping
+from .adapt import Transformer, Format
from .proto import AdaptContext
"""
- def __init__(self, name: str = "", format: Format = Format.TEXT):
+ def __init__(self, name: str = "", format: Format = Format.AUTO):
super().__init__(name)
if not isinstance(name, str):
raise TypeError(f"expected string as name, got {name!r}")
parts = []
if self._obj:
parts.append(repr(self._obj))
- if self._format != Format.TEXT:
+ if self._format != Format.AUTO:
parts.append(f"format={Format(self._format).name}")
return f"{self.__class__.__name__}({', '.join(parts)})"
def as_string(self, context: Optional[AdaptContext]) -> str:
- code = "s" if self._format == Format.TEXT else "b"
+ code = self._format
return f"%({self._obj}){code}" if self._obj else f"%{code}"
def as_bytes(self, context: Optional[AdaptContext]) -> bytes:
import struct
from typing import Any, Iterator, List, Optional, Type
+from .. import pq
+from .._enums import Format
from .. import errors as e
-from ..oids import builtins, TEXT_OID, TEXT_ARRAY_OID
-from ..adapt import Format, Dumper, Loader, Transformer
+from ..oids import builtins, TEXT_OID, TEXT_ARRAY_OID, INVALID_OID
+from ..adapt import Dumper, Loader, Transformer
from ..proto import AdaptContext
def __init__(self, cls: type, context: Optional[AdaptContext] = None):
super().__init__(cls, context)
tx = Transformer(context)
- self.set_sub_dumper(tx.get_dumper("", self.format))
+ fmt = Format.from_pq(self.format)
+ self.set_sub_dumper(tx.get_dumper("", fmt))
def set_sub_dumper(self, dumper: Dumper) -> None:
self.sub_dumper = dumper
- self.oid = self._get_array_oid(dumper.oid)
- self.sub_oid = dumper.oid or TEXT_OID
+ # We consider an array of unknowns as unknown, so we can dump empty
+ # lists or lists containing only None elements. However Postgres won't
+ # take unknown for element oid (in binary; in text it doesn't matter)
+ if dumper.oid != INVALID_OID:
+ self.oid = self._get_array_oid(dumper.oid)
+ self.sub_oid = dumper.oid
+ else:
+ self.oid = INVALID_OID
+ self.sub_oid = TEXT_OID
def _get_array_oid(self, base_oid: int) -> int:
"""
class ListDumper(BaseListDumper):
- format = Format.TEXT
+ format = pq.Format.TEXT
# from https://www.postgresql.org/docs/current/arrays.html#ARRAYS-IO
#
class ListBinaryDumper(BaseListDumper):
- format = Format.BINARY
+ format = pq.Format.BINARY
def dump(self, obj: List[Any]) -> bytes:
if not obj:
class ArrayLoader(BaseArrayLoader):
- format = Format.TEXT
+ format = pq.Format.TEXT
# Tokenize an array representation into item and brackets
# TODO: currently recognise only , as delimiter. Should be configured
def load(self, data: bytes) -> List[Any]:
rv = None
stack: List[Any] = []
- cast = self._tx.get_loader(self.base_oid, Format.TEXT).load
+ cast = self._tx.get_loader(self.base_oid, self.format).load
for m in self._re_parse.finditer(data):
t = m.group(1)
class ArrayBinaryLoader(BaseArrayLoader):
- format = Format.BINARY
+ format = pq.Format.BINARY
def load(self, data: bytes) -> List[Any]:
ndims, hasnull, oid = _struct_head.unpack_from(data[:12])
if not ndims:
return []
- fcast = self._tx.get_loader(oid, Format.BINARY).load
+ fcast = self._tx.get_loader(oid, self.format).load
p = 12 + 8 * ndims
dims = [
name = f"oid{base_oid}"
for base in (ArrayLoader, ArrayBinaryLoader):
- lname = f"{name.title()}Array{'Binary' if format else ''}Loader"
+ fmt = "Binary" if base.format == pq.Format.BINARY else ""
+ lname = f"{name.title()}Array{fmt}Loader"
loader: Type[Loader] = type(lname, (base,), {"base_oid": base_oid})
loader.register(array_oid, context=context)
from typing import Any, Callable, Iterator, List, NamedTuple, Optional
from typing import Sequence, Tuple, Type, Union, TYPE_CHECKING
+from .. import pq
from .. import sql
from .. import errors as e
from ..oids import TypeInfo, TEXT_OID
) -> Optional["CompositeInfo"]:
if isinstance(name, sql.Composable):
name = name.as_string(conn)
- cur = conn.cursor(format=Format.BINARY)
+ cur = conn.cursor()
cur.execute(cls._info_query, {"name": name})
recs = cur.fetchall()
return cls._from_records(recs)
) -> Optional["CompositeInfo"]:
if isinstance(name, sql.Composable):
name = name.as_string(conn)
- cur = await conn.cursor(format=Format.BINARY)
+ cur = await conn.cursor()
await cur.execute(cls._info_query, {"name": name})
recs = await cur.fetchall()
return cls._from_records(recs)
class SequenceDumper(Dumper):
- format = Format.TEXT
+ format = pq.Format.TEXT
def __init__(self, cls: type, context: Optional[AdaptContext] = None):
super().__init__(cls, context)
parts.append(sep)
continue
- dumper = self._tx.get_dumper(item, Format.TEXT)
+ dumper = self._tx.get_dumper(item, Format.from_pq(self.format))
ad = dumper.dump(item)
if not ad:
ad = b'""'
class BaseCompositeLoader(Loader):
- format = Format.TEXT
+ format = pq.Format.TEXT
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
super().__init__(oid, context)
if data == b"()":
return ()
- cast = self._tx.get_loader(TEXT_OID, format=Format.TEXT).load
+ cast = self._tx.get_loader(TEXT_OID, self.format).load
return tuple(
cast(token) if token is not None else None
for token in self._parse_record(data[1:-1])
class RecordBinaryLoader(Loader):
- format = Format.BINARY
+ format = pq.Format.BINARY
_types_set = False
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
def _config_types(self, data: bytes) -> None:
oids = [r[0] for r in self._walk_record(data)]
- self._tx.set_row_types(oids, [Format.BINARY] * len(oids))
+ self._tx.set_row_types(oids, [pq.Format.BINARY] * len(oids))
class CompositeLoader(RecordLoader):
- format = Format.TEXT
+ format = pq.Format.TEXT
factory: Callable[..., Any]
fields_types: List[int]
_types_set = False
def _config_types(self, data: bytes) -> None:
self._tx.set_row_types(
- self.fields_types, [Format.TEXT] * len(self.fields_types)
+ self.fields_types, [pq.Format.TEXT] * len(self.fields_types)
)
class CompositeBinaryLoader(RecordBinaryLoader):
- format = Format.BINARY
+ format = pq.Format.BINARY
factory: Callable[..., Any]
def load(self, data: bytes) -> Any:
from datetime import date, datetime, time, timedelta
from typing import cast, Optional
+from ..pq import Format
from ..oids import builtins
-from ..adapt import Dumper, Loader, Format
+from ..adapt import Dumper, Loader
from ..proto import AdaptContext
from ..errors import InterfaceError, DataError
import json
from typing import Any, Callable, Optional
+from ..pq import Format
from ..oids import builtins
-from ..adapt import Dumper, Loader, Format
+from ..adapt import Dumper, Loader
from ..errors import DataError
JsonDumpsFunction = Callable[[Any], str]
from typing import Callable, Optional, Union, TYPE_CHECKING
+from ..pq import Format
from ..oids import builtins
-from ..adapt import Dumper, Loader, Format
+from ..adapt import Dumper, Loader
from ..proto import AdaptContext
if TYPE_CHECKING:
from typing import Any, Callable, Dict, Tuple, cast
from decimal import Decimal
+from ..pq import Format
from ..oids import builtins
-from ..adapt import Dumper, Loader, Format
+from ..adapt import Dumper, Loader
_PackInt = Callable[[int], bytes]
_PackFloat = Callable[[float], bytes]
from .. import sql
from .. import errors as e
+from ..pq import Format
from ..oids import builtins, TypeInfo
-from ..adapt import Format, Dumper, Loader
+from ..adapt import Dumper, Loader
from ..proto import AdaptContext
from . import array
# Copyright (C) 2020 The Psycopg Team
+from ..pq import Format
from ..oids import builtins
-from ..adapt import Dumper, Loader, Format
+from ..adapt import Dumper, Loader
class BoolDumper(Dumper):
from typing import Optional, Union, TYPE_CHECKING
-from ..pq import Escaping
+from ..pq import Format, Escaping
from ..oids import builtins
-from ..adapt import Dumper, Loader, Format
+from ..adapt import Dumper, Loader
from ..proto import AdaptContext
from ..errors import DataError
class StringBinaryDumper(_StringDumper):
format = Format.BINARY
+ _oid = builtins["text"].oid
def dump(self, obj: str) -> bytes:
# the server will raise DataError subclass if the string contains 0x00
from typing import Callable, Optional, TYPE_CHECKING
+from ..pq import Format
from ..oids import builtins
-from ..adapt import Dumper, Loader, Format
+from ..adapt import Dumper, Loader
from ..proto import AdaptContext
if TYPE_CHECKING:
from typing import Any, Iterable, List, Optional, Sequence, Tuple
from psycopg3 import proto
-from psycopg3.adapt import Dumper, Loader, AdaptersMap
+from psycopg3.adapt import Dumper, Loader, AdaptersMap, Format
from psycopg3.connection import BaseConnection
-from psycopg3.pq import Format
+from psycopg3 import pq
from psycopg3.pq.proto import PGconn, PGresult
class Transformer(proto.AdaptContext):
@pgresult.setter
def pgresult(self, result: Optional[PGresult]) -> None: ...
def set_row_types(
- self, types: Sequence[int], formats: Sequence[Format]
+ self, types: Sequence[int], formats: Sequence[pq.Format]
) -> None: ...
def dump_sequence(
self, params: Sequence[Any], formats: Sequence[Format]
- ) -> Tuple[List[Any], Tuple[int, ...], Sequence[Format]]: ...
+ ) -> Tuple[List[Any], Tuple[int, ...], Sequence[pq.Format]]: ...
def get_dumper(self, obj: Any, format: Format) -> Dumper: ...
def load_rows(self, row0: int, row1: int) -> List[Tuple[Any, ...]]: ...
def load_row(self, row: int) -> Optional[Tuple[Any, ...]]: ...
def load_sequence(
self, record: Sequence[Optional[bytes]]
) -> Tuple[Any, ...]: ...
- def get_loader(self, oid: int, format: Format) -> Loader: ...
+ def get_loader(self, oid: int, format: pq.Format) -> Loader: ...
# Generators
def connect(conninfo: str) -> proto.PQGenConn[PGconn]: ...
from psycopg3_c.pq cimport libpq
from psycopg3_c._psycopg3 cimport oids
-from psycopg3.pq import Format
+from psycopg3.pq import Format as _pq_Format
+from psycopg3._enums import Format as _pg_Format
-FORMAT_TEXT = Format.TEXT
-FORMAT_BINARY = Format.BINARY
+PQ_TEXT = _pq_Format.TEXT
+PQ_BINARY = _pq_Format.BINARY
+
+PG_AUTO = _pg_Format.AUTO
+PG_TEXT = _pg_Format.TEXT
+PG_BINARY = _pg_Format.BINARY
include "_psycopg3/adapt.pyx"
from psycopg3_c.pq cimport _buffer_as_string_and_size
from psycopg3 import errors as e
-from psycopg3.pq import Format
from psycopg3.pq.misc import error_message
import logging
this_cls,
cls: Union[type, str],
context: Optional[AdaptContext] = None,
- int format = Format.TEXT,
+ int format = PQ_TEXT,
) -> None:
if context is not None:
adapters = context.adapters
cls,
oid: Union[int, str],
context: Optional["AdaptContext"] = None,
- int format = Format.TEXT,
+ int format = PQ_TEXT,
) -> None:
if isinstance(oid, str):
from psycopg3.oids import builtins
cdef uint32_t besize
cdef char *buf
cdef int i
- fmt = FORMAT_BINARY
+ fmt = PG_BINARY
for i in range(rowlen):
item = row[i]
cdef unsigned char *target
cdef int nesc = 0
cdef int with_tab
- fmt = FORMAT_TEXT
+ fmt = PG_TEXT
for i in range(rowlen):
# Include the tab before the data, so it gets included in the resizes
from cpython.list cimport (
PyList_New, PyList_CheckExact,
PyList_GET_ITEM, PyList_SET_ITEM, PyList_GET_SIZE)
+from cpython.bytes cimport PyBytes_AS_STRING
from cpython.tuple cimport PyTuple_New, PyTuple_SET_ITEM
from cpython.object cimport PyObject, PyObject_CallFunctionObjArgs
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from psycopg3 import errors as e
-
+from psycopg3._enums import Format as Pg3Format
+from psycopg3.pq import Format as PqFormat
# internal structure: you are not supposed to know this. But it's worth some
# 10% of the innermost loop, so I'm willing to ask for forgiveness later...
cdef class RowDumper:
cdef object dumpfunc
cdef object oid
+ cdef object format
cdef CDumper cdumper
cdef readonly object connection
cdef readonly object adapters
+ cdef dict _auto_dumpers
cdef dict _text_dumpers
cdef dict _binary_dumpers
cdef dict _text_loaders
self.adapters = global_adapters
self.connection = None
- # mapping class -> Dumper instance (text, binary)
- self._text_dumpers = {}
- self._binary_dumpers = {}
+ # mapping class -> Dumper instance (auto, text, binary)
+ self._auto_dumpers = None
+ self._text_dumpers = None
+ self._binary_dumpers = None
# mapping oid -> Loader instance (text, binary)
self._text_loaders = {}
cpdef object get_dumper(self, object obj, object format):
# Fast path: return a Dumper class already instantiated from the same type
- cdef dict cache
+ cdef PyObject *cache
cdef PyObject *ptr
cls = type(obj)
subobj = self._find_list_element(obj, set())
key = (cls, type(subobj))
- cache = self._binary_dumpers if format else self._text_dumpers
- ptr = PyDict_GetItem(cache, key)
+ bfmt = PyUnicode_AsUTF8String(format)
+ cdef char cfmt = PyBytes_AS_STRING(bfmt)[0]
+ if cfmt == b's':
+ if self._auto_dumpers is None:
+ self._auto_dumpers = {}
+ cache = <PyObject *>self._auto_dumpers
+ elif cfmt == b'b':
+ if self._binary_dumpers is None:
+ self._binary_dumpers = {}
+ cache = <PyObject *>self._binary_dumpers
+ elif cfmt == b't':
+ if self._text_dumpers is None:
+ self._text_dumpers = {}
+ cache = <PyObject *>self._text_dumpers
+ else:
+ raise ValueError(
+ f"format should be a psycopg3.adapt.Format, not {format}")
+
+ ptr = PyDict_GetItem(<object>cache, key)
if ptr != NULL:
return <object>ptr
+ # When dumping a string with %s we may refer to any type actually,
+ # but the user surely passed a text format
+ if cls is str and cfmt == b's':
+ format = PG_TEXT
+
+ sub_dumper = None
+ if cls is list:
+ # It's not possible to declare an empty unknown array, so force text
+ if subobj is None:
+ format = PG_TEXT
+
+ # If we are dumping a list it's the sub-object which should dictate
+ # what format to use.
+ else:
+ sub_dumper = self.get_dumper(subobj, format)
+ format = Pg3Format.from_pq(sub_dumper.format)
+
dcls = PyObject_CallFunctionObjArgs(
self.adapters.get_dumper, <PyObject *>cls, <PyObject *>format, NULL)
if dcls is None:
raise e.ProgrammingError(
f"cannot adapt type {cls.__name__}"
- f" to format {Format(format).name}")
+ f" to format {Pg3Format(format).name}")
d = PyObject_CallFunctionObjArgs(
dcls, <PyObject *>cls, <PyObject *>self, NULL)
- if cls is list:
- if subobj is not None:
- sub_dumper = self.get_dumper(subobj, format)
- d.set_sub_dumper(sub_dumper)
- elif format == FORMAT_TEXT:
- # Special case dumping an empty list (or containing no None
- # element). In text mode we cast them as unknown, so that
- # postgres can cast them automatically to something useful.
- # In binary we cannot do it, it doesn't seem there is a
- # representation for unknown array, so let's dump it as text[].
- # This means that placeholders receiving a binary array should
- # be almost always cast to the target type.
- d.oid = oids.INVALID_OID
-
- PyDict_SetItem(cache, key, d)
+ if sub_dumper is not None:
+ d.set_sub_dumper(sub_dumper)
+
+ PyDict_SetItem(<object>cache, key, d)
return d
cpdef dump_sequence(self, object params, object formats):
cdef int nparams = len(params)
cdef list ps = PyList_New(nparams)
cdef tuple ts = PyTuple_New(nparams)
+ cdef list fs = PyList_New(nparams)
cdef object dumped, oid
cdef Py_ssize_t size
cdef PyObject *dumper_ptr # borrowed pointer to row dumper
dumper_ptr = <PyObject *>tmp_dumper
oid = (<RowDumper>dumper_ptr).oid
+ dfmt = (<RowDumper>dumper_ptr).format
if (<RowDumper>dumper_ptr).cdumper is not None:
dumped = PyByteArray_FromStringAndSize("", 0)
size = (<RowDumper>dumper_ptr).cdumper.cdump(
else:
dumped = None
oid = oids.INVALID_OID
+ dfmt = PQ_TEXT
Py_INCREF(dumped)
PyList_SET_ITEM(ps, i, dumped)
Py_INCREF(oid)
PyTuple_SET_ITEM(ts, i, oid)
+ Py_INCREF(dfmt)
+ PyList_SET_ITEM(fs, i, dfmt)
- return ps, ts, formats
+ return ps, ts, fs
cdef RowDumper _get_row_dumper(self, object param, object fmt):
cdef RowDumper row_dumper = RowDumper()
dumper = self.get_dumper(param, fmt)
row_dumper.dumpfunc = dumper.dump
row_dumper.oid = dumper.oid
+ row_dumper.format = dumper.format
if isinstance(dumper, CDumper):
row_dumper.cdumper = <CDumper>dumper
return out
- def get_loader(self, oid: int, format: Format) -> "Loader":
+ def get_loader(self, oid: int, format: pq.Format) -> "Loader":
return self._c_get_loader(<PyObject *>oid, <PyObject *>format)
cdef object _c_get_loader(self, PyObject *oid, PyObject *fmt):
import logging
+from psycopg3.pq import Format as PqFormat
from psycopg3.pq.misc import PGnotify, connection_summary
from psycopg3_c.pq cimport PQBuffer
param_values: Optional[Sequence[Optional[bytes]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
- int result_format = Format.TEXT,
+ int result_format = PqFormat.TEXT,
) -> PGresult:
_ensure_pgconn(self)
param_values: Optional[Sequence[Optional[bytes]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
- int result_format = Format.TEXT,
+ int result_format = PqFormat.TEXT,
) -> None:
_ensure_pgconn(self)
const char *name,
param_values: Optional[Sequence[Optional[bytes]]],
param_formats: Optional[Sequence[int]] = None,
- int result_format = Format.TEXT,
+ int result_format = PqFormat.TEXT,
) -> None:
_ensure_pgconn(self)
const char *name,
param_values: Optional[Sequence[bytes]],
param_formats: Optional[Sequence[int]] = None,
- int result_format = Format.TEXT,
+ int result_format = PqFormat.TEXT,
) -> PGresult:
_ensure_pgconn(self)
# @cython.final # TODO? causes compile warnings
cdef class IntDumper(CDumper):
- format = Format.TEXT
+ format = PQ_TEXT
def __cinit__(self):
self.oid = oids.INT8_OID
@cython.final
cdef class Int4BinaryDumper(CDumper):
- format = Format.BINARY
+ format = PQ_BINARY
def __cinit__(self):
self.oid = oids.INT4_OID
@cython.final
cdef class Int8BinaryDumper(CDumper):
- format = Format.BINARY
+ format = PQ_BINARY
def __cinit__(self):
self.oid = oids.INT8_OID
@cython.final
cdef class IntLoader(CLoader):
- format = Format.TEXT
+ format = PQ_TEXT
cdef object cload(self, const char *data, size_t length):
# if the number ends with a 0 we don't need a copy
@cython.final
cdef class Int2BinaryLoader(CLoader):
- format = Format.BINARY
+ format = PQ_BINARY
cdef object cload(self, const char *data, size_t length):
return PyLong_FromLong(<int16_t>be16toh((<uint16_t *>data)[0]))
@cython.final
cdef class Int4BinaryLoader(CLoader):
- format = Format.BINARY
+ format = PQ_BINARY
cdef object cload(self, const char *data, size_t length):
return PyLong_FromLong(<int32_t>be32toh((<uint32_t *>data)[0]))
@cython.final
cdef class Int8BinaryLoader(CLoader):
- format = Format.BINARY
+ format = PQ_BINARY
cdef object cload(self, const char *data, size_t length):
return PyLong_FromLongLong(<int64_t>be64toh((<uint64_t *>data)[0]))
@cython.final
cdef class OidBinaryLoader(CLoader):
- format = Format.BINARY
+ format = PQ_BINARY
cdef object cload(self, const char *data, size_t length):
return PyLong_FromUnsignedLong(be32toh((<uint32_t *>data)[0]))
@cython.final
cdef class FloatDumper(CDumper):
- format = Format.TEXT
+ format = PQ_TEXT
def __cinit__(self):
self.oid = oids.FLOAT8_OID
@cython.final
cdef class FloatBinaryDumper(CDumper):
- format = Format.BINARY
+ format = PQ_BINARY
def __cinit__(self):
self.oid = oids.FLOAT8_OID
@cython.final
cdef class FloatLoader(CLoader):
- format = Format.TEXT
+ format = PQ_TEXT
cdef object cload(self, const char *data, size_t length):
cdef double d = PyOS_string_to_double(
@cython.final
cdef class Float4BinaryLoader(CLoader):
- format = Format.BINARY
+ format = PQ_BINARY
cdef object cload(self, const char *data, size_t length):
cdef uint32_t asint = be32toh((<uint32_t *>data)[0])
@cython.final
cdef class Float8BinaryLoader(CLoader):
- format = Format.BINARY
+ format = PQ_BINARY
cdef object cload(self, const char *data, size_t length):
cdef uint64_t asint = be64toh((<uint64_t *>data)[0])
cimport cython
-from psycopg3.pq import Format
-
@cython.final
cdef class BoolDumper(CDumper):
- format = Format.TEXT
+ format = PQ_TEXT
def __cinit__(self):
self.oid = oids.BOOL_OID
@cython.final
cdef class BoolBinaryDumper(CDumper):
- format = Format.BINARY
+ format = PQ_BINARY
def __cinit__(self):
self.oid = oids.BOOL_OID
@cython.final
cdef class BoolLoader(CLoader):
- format = Format.TEXT
+ format = PQ_TEXT
cdef object cload(self, const char *data, size_t length):
# this creates better C than `return data[0] == b't'`
@cython.final
cdef class BoolBinaryLoader(CLoader):
- format = Format.BINARY
+ format = PQ_BINARY
cdef object cload(self, const char *data, size_t length):
return True if data[0] else False
@cython.final
cdef class StringBinaryDumper(_StringDumper):
- format = Format.BINARY
+ format = PQ_BINARY
+
+ def __cinit__(self):
+ self.oid = oids.TEXT_OID
@cython.final
cdef class StringDumper(_StringDumper):
- format = Format.TEXT
+ format = PQ_TEXT
cdef Py_ssize_t cdump(self, obj, bytearray rv, Py_ssize_t offset) except -1:
cdef Py_ssize_t size = StringBinaryDumper.cdump(self, obj, rv, offset)
cdef class _TextLoader(CLoader):
- format = Format.TEXT
+ format = PQ_TEXT
cdef int is_utf8
cdef char *encoding
@cython.final
cdef class TextLoader(_TextLoader):
- format = Format.TEXT
+ format = PQ_TEXT
@cython.final
cdef class TextBinaryLoader(_TextLoader):
- format = Format.BINARY
+ format = PQ_BINARY
@cython.final
cdef class BytesDumper(CDumper):
- format = Format.TEXT
+ format = PQ_TEXT
def __cinit__(self):
self.oid = oids.BYTEA_OID
@cython.final
cdef class BytesBinaryDumper(CDumper):
- format = Format.BINARY
+ format = PQ_BINARY
def __cinit__(self):
self.oid = oids.BYTEA_OID
@cython.final
cdef class ByteaLoader(CLoader):
- format = Format.TEXT
+ format = PQ_TEXT
cdef object cload(self, const char *data, size_t length):
cdef size_t len_out
@cython.final
cdef class ByteaBinaryLoader(CLoader):
- format = Format.BINARY
+ format = PQ_BINARY
cdef object cload(self, const char *data, size_t length):
return data[:length]
def __init__(self, connection):
self.conn = connection
- self.format = Format.TEXT
+ self.format = Format.AUTO
self.records = []
self._schema = None
m(spec, g, w)
def get_supported_types(self):
- dumpers = self.conn.adapters._dumpers[self.format]
+ dumpers = self.conn.adapters._dumpers[Format.as_pq(self.format)]
rv = set()
for cls in dumpers.keys():
if isinstance(cls, str):
import pytest
import psycopg3
+from psycopg3 import pq
from psycopg3.adapt import Transformer, Format, Dumper, Loader
from psycopg3.oids import builtins, TEXT_OID
@pytest.mark.parametrize(
"data, format, result, type",
[
- (1, Format.TEXT, b"1", "numeric"),
+ (1, Format.TEXT, b"1", "int8"),
("hello", Format.TEXT, b"hello", "text"),
("hello", Format.BINARY, b"hello", "text"),
],
t = Transformer()
dumper = t.get_dumper(data, format)
assert dumper.dump(data) == result
- assert dumper.oid == 0 if type == "text" else builtins[type].oid
+ if type == "text" and format != Format.BINARY:
+ assert dumper.oid == 0
+ else:
+ assert dumper.oid == builtins[type].oid
@pytest.mark.parametrize(
def test_dump_connection_ctx(conn):
- make_dumper("t").register(str, conn)
- make_bin_dumper("b").register(str, conn)
+ make_dumper("t").register(MyStr, conn)
+ make_bin_dumper("b").register(MyStr, conn)
cur = conn.cursor()
- cur.execute("select %s, %b", ["hello", "world"])
- assert cur.fetchone() == ("hellot", "worldb")
+ cur.execute("select %s", [MyStr("hello")])
+ assert cur.fetchone() == ("hellob",)
+ cur.execute("select %t", [MyStr("hello")])
+ assert cur.fetchone() == ("hellot",)
+ cur.execute("select %b", [MyStr("hello")])
+ assert cur.fetchone() == ("hellob",)
def test_dump_cursor_ctx(conn):
make_dumper("tc").register(str, cur)
make_bin_dumper("bc").register(str, cur)
- cur.execute("select %s, %b", ["hello", "world"])
- assert cur.fetchone() == ("hellotc", "worldbc")
+ cur.execute("select %s", [MyStr("hello")])
+ assert cur.fetchone() == ("hellobc",)
+ cur.execute("select %t", [MyStr("hello")])
+ assert cur.fetchone() == ("hellotc",)
+ cur.execute("select %b", [MyStr("hello")])
+ assert cur.fetchone() == ("hellobc",)
cur = conn.cursor()
- cur.execute("select %s, %b", ["hello", "world"])
- assert cur.fetchone() == ("hellot", "worldb")
+ cur.execute("select %s", [MyStr("hello")])
+ assert cur.fetchone() == ("hellob",)
+ cur.execute("select %t", [MyStr("hello")])
+ assert cur.fetchone() == ("hellot",)
+ cur.execute("select %b", [MyStr("hello")])
+ assert cur.fetchone() == ("hellob",)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
return (obj * 2).encode("utf-8")
MyStringDumper.register(str, conn)
- assert conn.execute("select %s", ["hello"]).fetchone()[0] == "hellohello"
+ assert conn.execute("select %t", ["hello"]).fetchone()[0] == "hellohello"
def test_subclass_loader(conn):
@pytest.mark.parametrize(
"data, format, type, result",
[
- (b"1", Format.TEXT, "int4", 1),
- (b"hello", Format.TEXT, "text", "hello"),
- (b"hello", Format.BINARY, "text", "hello"),
+ (b"1", pq.Format.TEXT, "int4", 1),
+ (b"hello", pq.Format.TEXT, "text", "hello"),
+ (b"hello", pq.Format.BINARY, "text", "hello"),
],
)
def test_cast(data, format, type, result):
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellotc",)
- cur.format = Format.BINARY
+ cur.format = pq.Format.BINARY
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellobc",)
cur = conn.cursor()
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellot",)
- cur.format = Format.BINARY
+ cur.format = pq.Format.BINARY
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellob",)
"sql, obj",
[("'{hello}'::text[]", ["helloc"]), ("row('hello'::text)", ("helloc",))],
)
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_cursor_ctx_nested(conn, sql, obj, fmt_out):
cur = conn.cursor(format=fmt_out)
- if fmt_out == Format.TEXT:
+ if fmt_out == pq.Format.TEXT:
make_loader("c").register(TEXT_OID, cur)
else:
make_bin_loader("c").register(TEXT_OID, cur)
assert res == obj
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_array_dumper(conn, fmt_out):
t = Transformer(conn)
- dint = t.get_dumper([0], fmt_out)
+ fmt_in = Format.from_pq(fmt_out)
+ dint = t.get_dumper([0], fmt_in)
assert dint.oid == builtins["int8"].array_oid
assert dint.sub_oid == builtins["int8"].oid
- dstr = t.get_dumper([""], fmt_out)
- assert dstr.oid == builtins["text"].array_oid
+ dstr = t.get_dumper([""], fmt_in)
+ assert dstr.oid == (
+ builtins["text"].array_oid if fmt_in == Format.BINARY else 0
+ )
assert dstr.sub_oid == builtins["text"].oid
assert dstr is not dint
- assert t.get_dumper([1], fmt_out) is dint
- assert t.get_dumper([None, [1]], fmt_out) is dint
+ assert t.get_dumper([1], fmt_in) is dint
+ assert t.get_dumper([None, [1]], fmt_in) is dint
- dempty = t.get_dumper([], fmt_out)
- assert t.get_dumper([None, [None]], fmt_out) is dempty
- if fmt_out == Format.TEXT:
- assert dempty.oid == 0
- else:
- assert dempty.oid == builtins["text"].array_oid
- assert dempty.sub_oid == builtins["text"].oid
+ dempty = t.get_dumper([], fmt_in)
+ assert t.get_dumper([None, [None]], fmt_in) is dempty
+ assert dempty.oid == 0
+ assert dempty.dump([]) == b"{}"
L = []
L.append(L)
assert t.get_dumper(L, fmt_out)
+def test_string_connection_ctx(conn):
+ make_dumper("t").register(str, conn)
+ make_bin_dumper("b").register(str, conn)
+
+ cur = conn.cursor()
+ cur.execute("select %s", ["hello"])
+ assert cur.fetchone() == ("hellot",) # str prefers text
+ cur.execute("select %t", ["hello"])
+ assert cur.fetchone() == ("hellot",)
+ cur.execute("select %b", ["hello"])
+ assert cur.fetchone() == ("hellob",)
+
+
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
def test_none_type_argument(conn, fmt_in):
cur = conn.cursor()
assert not c_adapters
+class MyStr(str):
+ pass
+
+
def make_dumper(suffix):
"""Create a test dumper appending a suffix to the bytes representation."""
class TestDumper(Dumper):
oid = TEXT_OID
- format = Format.TEXT
+ format = pq.Format.TEXT
def dump(self, s):
return (s + suffix).encode("ascii")
def make_bin_dumper(suffix):
cls = make_dumper(suffix)
- cls.format = Format.BINARY
+ cls.format = pq.Format.BINARY
return cls
"""Create a test loader appending a suffix to the data returned."""
class TestLoader(Loader):
- format = Format.TEXT
+ format = pq.Format.TEXT
def load(self, b):
return b.decode("ascii") + suffix
def make_bin_loader(suffix):
cls = make_loader(suffix)
- cls.format = Format.BINARY
+ cls.format = pq.Format.BINARY
return cls
from psycopg3 import pq
from psycopg3 import sql
from psycopg3 import errors as e
+from psycopg3.pq import Format
from psycopg3.oids import builtins
-from psycopg3.adapt import Format
from psycopg3.types.numeric import Int4
eur = "\u20ac"
from psycopg3 import pq
from psycopg3 import sql
from psycopg3 import errors as e
+from psycopg3.pq import Format
from psycopg3.oids import builtins
-from psycopg3.adapt import Format
from .test_copy import sample_text, sample_binary, sample_binary_rows # noqa
from .test_copy import eur, sample_values, sample_records, sample_tabledef
import pytest
import psycopg3
+from psycopg3 import pq
from psycopg3.oids import builtins
from psycopg3.adapt import Format
def test_execute_binary_result(conn):
- cur = conn.cursor(format=Format.BINARY)
+ cur = conn.cursor(format=pq.Format.BINARY)
cur.execute("select %s::text, %s::text", ["foo", None])
assert cur.pgresult.fformat(0) == 1
cur.executemany(query, [(10, "hello"), (20, "world")])
-@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
-def test_executemany_null_first(conn, fmt):
- ph = "%s" if fmt == Format.TEXT else "%b"
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+def test_executemany_null_first(conn, fmt_in):
cur = conn.cursor()
cur.execute("create table testmany (a bigint, b bigint)")
cur.executemany(
- f"insert into testmany values ({ph}, {ph})", [[1, None], [3, 4]]
+ f"insert into testmany values (%{fmt_in}, %{fmt_in})",
+ [[1, None], [3, 4]],
)
- with pytest.raises(
- (
- psycopg3.errors.InvalidTextRepresentation,
- psycopg3.errors.ProtocolViolation,
- )
- ):
+ with pytest.raises((psycopg3.DataError, psycopg3.ProgrammingError)):
cur.executemany(
- f"insert into testmany values ({ph}, {ph})", [[1, ""], [3, 4]]
+ f"insert into testmany values (%{fmt_in}, %{fmt_in})",
+ [[1, ""], [3, 4]],
)
assert cur.query is None
assert cur.params is None
- cur.execute("select %s, %s::text", [1, None])
+ cur.execute("select %t, %s::text", [1, None])
assert cur.query == b"select $1, $2::text"
assert cur.params == [b"1", None]
assert cur.params is None
with pytest.raises(psycopg3.DataError):
- cur.execute("select %s::int", ["wat"])
+ cur.execute("select %t::int", ["wat"])
assert cur.query == b"select $1::int"
assert cur.params == [b"wat"]
def test_query_params_executemany(conn):
cur = conn.cursor()
- cur.executemany("select %s, %s", [[1, 2], [3, 4]])
+ cur.executemany("select %t, %t", [[1, 2], [3, 4]])
assert cur.query == b"select $1, $2"
assert cur.params == [b"3", b"4"]
with pytest.raises((psycopg3.DataError, TypeError)):
- cur.executemany("select %s::int", [[1], ["x"], [2]])
+ cur.executemany("select %t::int", [[1], ["x"], [2]])
assert cur.query == b"select $1::int"
# TODO: cannot really check this: after introduced row_dumpers, this
# fails dumping, not query passing.
@pytest.mark.slow
-@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_leak_fetchall(dsn, faker, fmt):
- if fmt == Format.TEXT:
+ if fmt != Format.BINARY:
pytest.xfail("faker to extend to all text dumpers")
faker.format = fmt
n = []
for i in range(3):
with psycopg3.connect(dsn) as conn:
- with conn.cursor(format=fmt) as cur:
+ with conn.cursor(format=Format.as_pq(fmt)) as cur:
cur.execute(faker.drop_stmt)
cur.execute(faker.create_stmt)
cur.executemany(faker.insert_stmt, faker.records)
import weakref
import psycopg3
+from psycopg3 import pq
from psycopg3.adapt import Format
pytestmark = pytest.mark.asyncio
async def test_execute_binary_result(aconn):
- cur = await aconn.cursor(format=Format.BINARY)
+ cur = await aconn.cursor(format=pq.Format.BINARY)
await cur.execute("select %s::text, %s::text", ["foo", None])
assert cur.pgresult.fformat(0) == 1
await cur.executemany(query, [(10, "hello"), (20, "world")])
-@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
-async def test_executemany_null_first(aconn, fmt):
- ph = "%s" if fmt == Format.TEXT else "%b"
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+async def test_executemany_null_first(aconn, fmt_in):
cur = await aconn.cursor()
await cur.execute("create table testmany (a bigint, b bigint)")
await cur.executemany(
- f"insert into testmany values ({ph}, {ph})", [[1, None], [3, 4]]
+ f"insert into testmany values (%{fmt_in}, %{fmt_in})",
+ [[1, None], [3, 4]],
)
- with pytest.raises(
- (
- psycopg3.errors.InvalidTextRepresentation,
- psycopg3.errors.ProtocolViolation,
- )
- ):
+ with pytest.raises((psycopg3.DataError, psycopg3.ProgrammingError)):
await cur.executemany(
- f"insert into testmany values ({ph}, {ph})", [[1, ""], [3, 4]]
+ f"insert into testmany values (%{fmt_in}, %{fmt_in})",
+ [[1, ""], [3, 4]],
)
assert cur.query is None
assert cur.params is None
- await cur.execute("select %s, %s::text", [1, None])
+ await cur.execute("select %t, %s::text", [1, None])
assert cur.query == b"select $1, $2::text"
assert cur.params == [b"1", None]
assert cur.params is None
with pytest.raises(psycopg3.DataError):
- await cur.execute("select %s::int", ["wat"])
+ await cur.execute("select %t::int", ["wat"])
assert cur.query == b"select $1::int"
assert cur.params == [b"wat"]
async def test_query_params_executemany(aconn):
cur = await aconn.cursor()
- await cur.executemany("select %s, %s", [[1, 2], [3, 4]])
+ await cur.executemany("select %t, %t", [[1, 2], [3, 4]])
assert cur.query == b"select $1, $2"
assert cur.params == [b"3", b"4"]
with pytest.raises((psycopg3.DataError, TypeError)):
- await cur.executemany("select %s::int", [[1], ["x"], [2]])
+ await cur.executemany("select %t::int", [[1], ["x"], [2]])
assert cur.query == b"select $1::int"
# TODO: cannot really check this: after introduced row_dumpers, this
# fails dumping, not query passing.
import pytest
import psycopg3
-from psycopg3.adapt import Transformer
+from psycopg3 import pq
+from psycopg3.adapt import Transformer, Format
from psycopg3._queries import PostgresQuery, _split_query
@pytest.mark.parametrize(
"input, want",
[
- (b"", [(b"", 0, 0)]),
- (b"foo bar", [(b"foo bar", 0, 0)]),
- (b"foo %% bar", [(b"foo % bar", 0, 0)]),
- (b"%s", [(b"", 0, 0), (b"", 0, 0)]),
- (b"%s foo", [(b"", 0, 0), (b" foo", 0, 0)]),
- (b"%b foo", [(b"", 0, 1), (b" foo", 0, 0)]),
- (b"foo %s", [(b"foo ", 0, 0), (b"", 0, 0)]),
- (b"foo %%%s bar", [(b"foo %", 0, 0), (b" bar", 0, 0)]),
- (b"foo %(name)s bar", [(b"foo ", "name", 0), (b" bar", 0, 0)]),
+ (b"", [(b"", 0, Format.AUTO)]),
+ (b"foo bar", [(b"foo bar", 0, Format.AUTO)]),
+ (b"foo %% bar", [(b"foo % bar", 0, Format.AUTO)]),
+ (b"%s", [(b"", 0, Format.AUTO), (b"", 0, Format.AUTO)]),
+ (b"%s foo", [(b"", 0, Format.AUTO), (b" foo", 0, Format.AUTO)]),
+ (b"%b foo", [(b"", 0, Format.BINARY), (b" foo", 0, Format.AUTO)]),
+ (b"foo %s", [(b"foo ", 0, Format.AUTO), (b"", 0, Format.AUTO)]),
+ (
+ b"foo %%%s bar",
+ [(b"foo %", 0, Format.AUTO), (b" bar", 0, Format.AUTO)],
+ ),
+ (
+ b"foo %(name)s bar",
+ [(b"foo ", "name", Format.AUTO), (b" bar", 0, Format.AUTO)],
+ ),
(
b"foo %(name)s %(name)b bar",
- [(b"foo ", "name", 0), (b" ", "name", 1), (b" bar", 0, 0)],
+ [
+ (b"foo ", "name", Format.AUTO),
+ (b" ", "name", Format.BINARY),
+ (b" bar", 0, Format.AUTO),
+ ],
),
(
b"foo %s%b bar %s baz",
- [(b"foo ", 0, 0), (b"", 1, 1), (b" bar ", 2, 0), (b" baz", 0, 0)],
+ [
+ (b"foo ", 0, Format.AUTO),
+ (b"", 1, Format.BINARY),
+ (b" bar ", 2, Format.AUTO),
+ (b" baz", 0, Format.AUTO),
+ ],
),
],
)
(b"", None, b"", None, None),
(b"", [], b"", [], []),
(b"%%", [], b"%", [], []),
- (b"select %s", (1,), b"select $1", [False], [b"1"]),
- (b"%s %% %s", (1, 2), b"$1 % $2", [False, False], [b"1", b"2"]),
- (b"%b %% %s", ("a", 2), b"$1 % $2", [True, False], [b"a", b"2"]),
+ (b"select %t", (1,), b"select $1", [pq.Format.TEXT], [b"1"]),
+ (
+ b"%t %% %t",
+ (1, 2),
+ b"$1 % $2",
+ [pq.Format.TEXT, pq.Format.TEXT],
+ [b"1", b"2"],
+ ),
+ (
+ b"%t %% %t",
+ ("a", 2),
+ b"$1 % $2",
+ [pq.Format.TEXT, pq.Format.TEXT],
+ [b"a", b"2"],
+ ),
],
)
def test_pg_query_seq(query, params, want, wformats, wparams):
(b"", {}, b"", [], []),
(b"hello %%", {"a": 1}, b"hello %", [], []),
(
- b"select %(hello)s",
+ b"select %(hello)t",
{"hello": 1, "world": 2},
b"select $1",
- [False],
+ [pq.Format.TEXT],
[b"1"],
),
(
- b"select %(hi)s %(there)b %(hi)s",
- {"hi": 1, "there": "a"},
+ b"select %(hi)s %(there)s %(hi)s",
+ {"hi": 0, "there": "a"},
b"select $1 $2 $1",
- [False, True],
- [b"1", b"a"],
+ [pq.Format.BINARY, pq.Format.TEXT],
+ [b"\x00" * 8, b"a"],
),
],
)
import pytest
from psycopg3 import sql, ProgrammingError
-from psycopg3.pq import Format
+from psycopg3.adapt import Format
@pytest.mark.parametrize(
import pytest
import psycopg3
+from psycopg3 import pq
from psycopg3.oids import builtins
from psycopg3.adapt import Format, Transformer
from psycopg3.types import array
),
]
+fmts_in = [Format.AUTO, Format.TEXT, Format.BINARY]
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+
+@pytest.mark.parametrize("fmt_in", fmts_in)
@pytest.mark.parametrize("obj, want", tests_str)
def test_dump_list_str(conn, obj, want, fmt_in):
cur = conn.cursor()
- ph = "%s" if fmt_in == Format.TEXT else "%b"
- cur.execute(f"select {ph}::text[] = %s::text[]", (obj, want))
+ cur.execute(f"select %{fmt_in}::text[] = %s::text[]", (obj, want))
assert cur.fetchone()[0]
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("want, obj", tests_str)
def test_load_list_str(conn, obj, want, fmt_out):
cur = conn.cursor(format=fmt_out)
assert cur.fetchone()[0] == want
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", fmts_in)
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_all_chars(conn, fmt_in, fmt_out):
cur = conn.cursor(format=fmt_out)
- ph = "%s" if fmt_in == Format.TEXT else "%b"
for i in range(1, 256):
c = chr(i)
- cur.execute(f"select {ph}::text[]", ([c],))
+ cur.execute(f"select %{fmt_in}::text[]", ([c],))
assert cur.fetchone()[0] == [c]
a = list(map(chr, range(1, 256)))
a.append("\u20ac")
- cur.execute(f"select {ph}::text[]", (a,))
+ cur.execute(f"select %{fmt_in}::text[]", (a,))
assert cur.fetchone()[0] == a
a = "".join(a)
- cur.execute(f"select {ph}::text[]", ([a],))
+ cur.execute(f"select %{fmt_in}::text[]", ([a],))
assert cur.fetchone()[0] == [a]
[
[["a"], ["b", "c"]],
[["a"], []],
- [[]],
[[["a"]], ["b"]],
# [["a"], [["b"]]], # todo, but expensive (an isinstance per item)
# [True, b"a"], # TODO expensive too
tx.get_dumper(input, Format.BINARY).dump(input)
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("want, obj", tests_int)
def test_load_list_int(conn, obj, want, fmt_out):
cur = conn.cursor(format=fmt_out)
assert dumper.oid == builtins[type].array_oid
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", fmts_in)
def test_empty_list_mix(conn, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
objs = list(range(3))
- # pro tip: don't get confused with the types
conn.execute("create table testarrays (col1 bigint[], col2 bigint[])")
- if fmt_in == Format.TEXT:
- f1, f2 = conn.execute(
- f"insert into testarrays values ({ph}, {ph}) returning *",
- (objs, []),
- ).fetchone()
- else:
- # TODO: fix passing empty lists in binary format
- try:
- f1, f2 = conn.execute(
- f"insert into testarrays values ({ph}, {ph}) returning *",
- (objs, []),
- ).fetchone()
- except psycopg3.errors.DatatypeMismatch:
- pytest.xfail("empty lists in binary format not supported")
- else:
- assert False, "you fixed the thing, now fix the test!"
-
+ # pro tip: don't get confused with the types
+ f1, f2 = conn.execute(
+ f"insert into testarrays values (%{fmt_in}, %{fmt_in}) returning *",
+ (objs, []),
+ ).fetchone()
assert f1 == objs
assert f2 == []
-def test_empty_list_text(conn):
+@pytest.mark.parametrize("fmt_in", fmts_in)
+def test_empty_list(conn, fmt_in):
cur = conn.cursor()
cur.execute("create table test (id serial primary key, data date[])")
with conn.transaction():
- cur.execute("insert into test (data) values (%s)", ([],))
- cur.execute("select data from test")
- assert cur.fetchone() == ([],)
-
- # test untyped list in a filter
- cur.execute("select data from test where id = any(%s)", ([1],))
- assert cur.fetchone()
- cur.execute("select data from test where id = any(%s)", ([],))
- assert not cur.fetchone()
-
-
-def test_empty_list_binary(conn):
- cur = conn.cursor()
- cur.execute("create table test (id serial primary key, data date[])")
- with pytest.raises(psycopg3.errors.DatatypeMismatch):
- with conn.transaction():
- cur.execute("insert into test (data) values (%b)", ([],))
- cur.execute("insert into test (data) values (%b::date[])", ([],))
-
+ cur.execute(f"insert into test (data) values (%{fmt_in})", ([],))
cur.execute("select data from test")
assert cur.fetchone() == ([],)
# test untyped list in a filter
- cur.execute("select data from test where id = any(%b)", ([1],))
+ cur.execute(f"select data from test where id = any(%{fmt_in})", ([1],))
assert cur.fetchone()
- with pytest.raises(psycopg3.errors.UndefinedFunction):
- with conn.transaction():
- cur.execute("select data from test where id = any(%b)", ([],))
- cur.execute("select data from test where id = any(%b::int[])", ([],))
+ cur.execute(f"select data from test where id = any(%{fmt_in})", ([],))
assert not cur.fetchone()
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", fmts_in)
def test_empty_list_after_choice(conn, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
cur = conn.cursor()
cur.execute("create table test (id serial primary key, data float[])")
cur.executemany(
- f"insert into test (data) values ({ph})", [([1.0],), ([],)]
+ f"insert into test (data) values (%{fmt_in})", [([1.0],), ([],)]
)
cur.execute("select data from test order by id")
assert cur.fetchall() == [([1.0],), ([],)]
import pytest
+from psycopg3 import pq
from psycopg3.sql import Identifier
from psycopg3.oids import builtins
from psycopg3.adapt import Format, global_adapters
assert res == obj
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_all_chars(conn, fmt_out):
cur = conn.cursor(format=fmt_out)
for i in range(1, 256):
assert info.fields[i].type_oid == builtins[t].oid
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_dump_composite_all_chars(conn, fmt_in, testcomp):
if fmt_in == Format.BINARY:
pytest.xfail("binary composite dumper not implemented")
- ph = "%s" if fmt_in == Format.TEXT else "%b"
cur = conn.cursor()
for i in range(1, 256):
(res,) = cur.execute(
- f"select row(chr(%s::int), 1, 1.0)::testcomp = {ph}::testcomp",
+ f"select row(chr(%s::int), 1, 1.0)::testcomp = %{fmt_in}::testcomp",
(i, (chr(i), 1, 1.0)),
).fetchone()
assert res is True
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_composite(conn, testcomp, fmt_out):
info = CompositeInfo.fetch(conn, "testcomp")
info.register(conn)
assert isinstance(res[0].baz, float)
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_composite_factory(conn, testcomp, fmt_out):
info = CompositeInfo.fetch(conn, "testcomp")
def test_register_scope(conn, testcomp):
info = CompositeInfo.fetch(conn, "testcomp")
info.register()
- for fmt in (Format.TEXT, Format.BINARY):
+ for fmt in (pq.Format.TEXT, pq.Format.BINARY):
for oid in (info.oid, info.array_oid):
assert global_adapters._loaders[fmt].pop(oid)
cur = conn.cursor()
info.register(cur)
- for fmt in (Format.TEXT, Format.BINARY):
+ for fmt in (pq.Format.TEXT, pq.Format.BINARY):
for oid in (info.oid, info.array_oid):
assert oid not in global_adapters._loaders[fmt]
assert oid not in conn.adapters._loaders[fmt]
assert oid in cur.adapters._loaders[fmt]
info.register(conn)
- for fmt in (Format.TEXT, Format.BINARY):
+ for fmt in (pq.Format.TEXT, pq.Format.BINARY):
for oid in (info.oid, info.array_oid):
assert oid not in global_adapters._loaders[fmt]
assert oid in conn.adapters._loaders[fmt]
import pytest
-import psycopg3.types.json
-from psycopg3.types.json import Json, Jsonb
+import psycopg3.types
+from psycopg3 import pq
+from psycopg3.types import Json, Jsonb
from psycopg3.adapt import Format
samples = [
@pytest.mark.parametrize("val", samples)
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_json_dump(conn, val, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
obj = json.loads(val)
cur = conn.cursor()
- cur.execute(f"select pg_typeof({ph}) = 'json'::regtype", (Json(obj),))
+ cur.execute(f"select pg_typeof(%{fmt_in}) = 'json'::regtype", (Json(obj),))
assert cur.fetchone()[0] is True
- cur.execute(f"select {ph}::text = %s::json::text", (Json(obj), val))
+ cur.execute(f"select %{fmt_in}::text = %s::json::text", (Json(obj), val))
assert cur.fetchone()[0] is True
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("val", samples)
def test_jsonb_dump(conn, val, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
obj = json.loads(val)
cur = conn.cursor()
- cur.execute(f"select {ph} = %s::jsonb", (Jsonb(obj), val))
+ cur.execute(f"select %{fmt_in} = %s::jsonb", (Jsonb(obj), val))
assert cur.fetchone()[0] is True
@pytest.mark.parametrize("val", samples)
@pytest.mark.parametrize("jtype", ["json", "jsonb"])
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_json_load(conn, val, jtype, fmt_out):
cur = conn.cursor(format=fmt_out)
cur.execute(f"select %s::{jtype}", (val,))
assert cur.fetchone()[0] == json.loads(val)
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("wrapper", ["Json", "Jsonb"])
def test_json_dump_customise(conn, wrapper, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
- wrapper = getattr(psycopg3.types.json, wrapper)
+ wrapper = getattr(psycopg3.types, wrapper)
obj = {"foo": "bar"}
cur = conn.cursor()
cur.execute(
- f"select {ph}->>'baz' = 'qux'", (wrapper(obj, dumps=my_dumps),)
+ f"select %{fmt_in}->>'baz' = 'qux'", (wrapper(obj, dumps=my_dumps),)
)
assert cur.fetchone()[0] is True
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("wrapper", ["Json", "Jsonb"])
def test_json_dump_subclass(conn, wrapper, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
- wrapper = getattr(psycopg3.types.json, wrapper)
+ wrapper = getattr(psycopg3.types, wrapper)
class MyWrapper(wrapper):
def dumps(self):
obj = {"foo": "bar"}
cur = conn.cursor()
- cur.execute(f"select {ph}->>'baz' = 'qux'", (MyWrapper(obj),))
+ cur.execute(f"select %{fmt_in}->>'baz' = 'qux'", (MyWrapper(obj),))
assert cur.fetchone()[0] is True
import pytest
+from psycopg3 import pq
from psycopg3.adapt import Format
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("val", ["192.168.0.1", "2001:db8::"])
def test_address_dump(conn, fmt_in, val):
binary_check(fmt_in)
- ph = "%s" if fmt_in == Format.TEXT else "%b"
cur = conn.cursor()
- cur.execute(f"select {ph} = %s::inet", (ipaddress.ip_address(val), val))
+ cur.execute(
+ f"select %{fmt_in} = %s::inet", (ipaddress.ip_address(val), val)
+ )
assert cur.fetchone()[0] is True
cur.execute(
- f"select {ph} = array[null, %s]::inet[]",
+ f"select %{fmt_in} = array[null, %s]::inet[]",
([None, ipaddress.ip_interface(val)], val),
)
assert cur.fetchone()[0] is True
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("val", ["127.0.0.1/24", "::ffff:102:300/128"])
def test_interface_dump(conn, fmt_in, val):
binary_check(fmt_in)
- ph = "%s" if fmt_in == Format.TEXT else "%b"
cur = conn.cursor()
- cur.execute(f"select {ph} = %s::inet", (ipaddress.ip_interface(val), val))
+ cur.execute(
+ f"select %{fmt_in} = %s::inet", (ipaddress.ip_interface(val), val)
+ )
assert cur.fetchone()[0] is True
cur.execute(
- f"select {ph} = array[null, %s]::inet[]",
+ f"select %{fmt_in} = array[null, %s]::inet[]",
([None, ipaddress.ip_interface(val)], val),
)
assert cur.fetchone()[0] is True
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("val", ["127.0.0.0/24", "::ffff:102:300/128"])
def test_network_dump(conn, fmt_in, val):
binary_check(fmt_in)
- ph = "%s" if fmt_in == Format.TEXT else "%b"
cur = conn.cursor()
- cur.execute(f"select {ph} = %s::cidr", (ipaddress.ip_network(val), val))
+ cur.execute(
+ f"select %{fmt_in} = %s::cidr", (ipaddress.ip_network(val), val)
+ )
assert cur.fetchone()[0] is True
cur.execute(
- f"select {ph} = array[NULL, %s]::cidr[]",
+ f"select %{fmt_in} = array[NULL, %s]::cidr[]",
([None, ipaddress.ip_network(val)], val),
)
assert cur.fetchone()[0] is True
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("val", ["127.0.0.1/32", "::ffff:102:300/128"])
def test_inet_load_address(conn, fmt_out, val):
binary_check(fmt_out)
assert cur.fetchone()[0] == [None, addr]
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("val", ["127.0.0.1/24", "::ffff:102:300/127"])
def test_inet_load_network(conn, fmt_out, val):
binary_check(fmt_out)
assert cur.fetchone()[0] == [None, ipaddress.ip_interface(val)]
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("val", ["127.0.0.0/24", "::ffff:102:300/128"])
def test_cidr_load(conn, fmt_out, val):
binary_check(fmt_out)
def binary_check(fmt):
- if fmt == Format.BINARY:
+ if fmt == Format.BINARY or fmt == pq.Format.BINARY:
pytest.xfail("inet binary not implemented")
import pytest
import psycopg3
+from psycopg3 import pq
from psycopg3 import sql
from psycopg3.oids import builtins
from psycopg3.adapt import Transformer, Format
)
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
def test_dump_int(conn, val, expr, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
assert isinstance(val, int)
cur = conn.cursor()
- cur.execute(f"select {expr} = {ph}", (val,))
+ cur.execute(f"select {expr} = %{fmt_in}", (val,))
assert cur.fetchone()[0] is True
tname = builtins[expr.rsplit(":", 1)[-1]].name.title()
assert tname in "Int2 Int4 Int8 Oid".split()
Type = getattr(psycopg3.types.numeric, tname)
- ph = "%s" if fmt_in == Format.TEXT else "%b"
cur = conn.cursor()
- cur.execute(f"select pg_typeof({expr}) = pg_typeof({ph})", (Type(val),))
+ cur.execute(
+ f"select pg_typeof({expr}) = pg_typeof(%{fmt_in})", (Type(val),)
+ )
assert cur.fetchone()[0] is True
- cur.execute(f"select {expr} = {ph}", (Type(val),))
+ cur.execute(f"select {expr} = %{fmt_in}", (Type(val),))
assert cur.fetchone()[0] is True
)
def test_quote_int(conn, val, expr):
tx = Transformer()
- assert tx.get_dumper(val, 0).quote(val) == expr
+ assert tx.get_dumper(val, Format.TEXT).quote(val) == expr
cur = conn.cursor()
cur.execute(sql.SQL("select {v}, -{v}").format(v=sql.Literal(val)))
("4294967295", "oid", 4294967295),
],
)
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_int(conn, val, pgtype, want, fmt_out):
cur = conn.cursor(format=fmt_out)
cur.execute(f"select %s::{pgtype}", (val,))
)
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
def test_dump_float(conn, val, expr, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
assert isinstance(val, float)
cur = conn.cursor()
- cur.execute(f"select {ph} = {expr}::float8", (val,))
+ cur.execute(f"select %{fmt_in} = {expr}::float8", (val,))
assert cur.fetchone()[0] is True
)
def test_quote_float(conn, val, expr):
tx = Transformer()
- assert tx.get_dumper(val, 0).quote(val) == expr
+ assert tx.get_dumper(val, Format.TEXT).quote(val) == expr
cur = conn.cursor()
cur.execute(sql.SQL("select {v}, -{v}").format(v=sql.Literal(val)))
("-inf", "float8", -float("inf")),
],
)
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_float(conn, val, pgtype, want, fmt_out):
cur = conn.cursor(format=fmt_out)
cur.execute(f"select %s::{pgtype}", (val,))
("-1.42e40", "float8", -1.42e40),
],
)
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_float_approx(conn, expr, pgtype, want, fmt_out):
cur = conn.cursor(format=fmt_out)
cur.execute("select %s::%s" % (expr, pgtype))
def test_quote_numeric(conn, val, expr):
val = Decimal(val)
tx = Transformer()
- assert tx.get_dumper(val, 0).quote(val) == expr
+ assert tx.get_dumper(val, Format.TEXT).quote(val) == expr
cur = conn.cursor()
cur.execute(sql.SQL("select {v}, -{v}").format(v=sql.Literal(val)))
import pytest
+from psycopg3 import pq
from psycopg3 import sql
from psycopg3.oids import builtins
from psycopg3.adapt import Transformer, Format
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("b", [True, False])
def test_roundtrip_bool(conn, b, fmt_in, fmt_out):
cur = conn.cursor(format=fmt_out)
- ph = "%s" if fmt_in == Format.TEXT else "%b"
- result = cur.execute(f"select {ph}", (b,)).fetchone()[0]
+ result = cur.execute(f"select %{fmt_in}", (b,)).fetchone()[0]
assert cur.pgresult.fformat(0) == fmt_out
if b is not None:
assert cur.pgresult.ftype(0) == builtins["bool"].oid
assert result is b
- result = cur.execute(f"select {ph}", ([b],)).fetchone()[0]
+ result = cur.execute(f"select %{fmt_in}", ([b],)).fetchone()[0]
assert cur.pgresult.fformat(0) == fmt_out
if b is not None:
assert cur.pgresult.ftype(0) == builtins["bool"].array_oid
def test_quote_bool(conn, val):
tx = Transformer()
- assert tx.get_dumper(val, 0).quote(val) == str(val).lower().encode("ascii")
+ assert tx.get_dumper(val, Format.TEXT).quote(val) == str(
+ val
+ ).lower().encode("ascii")
cur = conn.cursor()
cur.execute(sql.SQL("select {v}").format(v=sql.Literal(val)))
def test_quote_none(conn):
tx = Transformer()
- assert tx.get_dumper(None, 0).quote(None) == b"NULL"
+ assert tx.get_dumper(None, Format.TEXT).quote(None) == b"NULL"
cur = conn.cursor()
cur.execute(sql.SQL("select {v}").format(v=sql.Literal(None)))
import pytest
import psycopg3
+from psycopg3 import pq
from psycopg3 import sql
from psycopg3.adapt import Format
#
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_dump_1char(conn, fmt_in):
cur = conn.cursor()
- ph = "%s" if fmt_in == Format.TEXT else "%b"
for i in range(1, 256):
- cur.execute(f"select {ph} = chr(%s::int)", (chr(i), i))
+ cur.execute(f"select %{fmt_in} = chr(%s::int)", (chr(i), i))
assert cur.fetchone()[0] is True, chr(i)
assert cur.fetchone()[0] is True, chr(i)
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_dump_zero(conn, fmt_in):
cur = conn.cursor()
- ph = "%s" if fmt_in == Format.TEXT else "%b"
s = "foo\x00bar"
with pytest.raises(psycopg3.DataError):
- cur.execute(f"select {ph}::text", (s,))
+ cur.execute(f"select %{fmt_in}::text", (s,))
def test_quote_zero(conn):
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_1char(conn, typename, fmt_out):
cur = conn.cursor(format=fmt_out)
for i in range(1, 256):
assert cur.pgresult.fformat(0) == fmt_out
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("encoding", ["utf8", "latin9", "ascii"])
def test_dump_enc(conn, fmt_in, encoding):
cur = conn.cursor()
- ph = "%s" if fmt_in == Format.TEXT else "%b"
conn.client_encoding = encoding
- (res,) = cur.execute(f"select ascii({ph})", (eur,)).fetchone()
+ (res,) = cur.execute(f"select ascii(%{fmt_in})", (eur,)).fetchone()
assert res == ord(eur)
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_dump_badenc(conn, fmt_in):
cur = conn.cursor()
- ph = "%s" if fmt_in == Format.TEXT else "%b"
conn.client_encoding = "latin1"
with pytest.raises(UnicodeEncodeError):
- cur.execute(f"select {ph}::bytea", (eur,))
+ cur.execute(f"select %{fmt_in}::bytea", (eur,))
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_dump_utf8_badenc(conn, fmt_in):
cur = conn.cursor()
- ph = "%s" if fmt_in == Format.TEXT else "%b"
conn.client_encoding = "utf-8"
with pytest.raises(UnicodeEncodeError):
- cur.execute(f"select {ph}", ("\uddf8",))
+ cur.execute(f"select %{fmt_in}", ("\uddf8",))
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_load_enc(conn, typename, encoding, fmt_out):
assert res == eur
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_load_badenc(conn, typename, fmt_out):
cur = conn.cursor(format=fmt_out)
cur.execute(f"select chr(%s::int)::{typename}", (ord(eur),))
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_load_ascii(conn, typename, fmt_out):
cur = conn.cursor(format=fmt_out)
assert res == eur.encode("utf8")
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
def test_text_array(conn, typename, fmt_in, fmt_out):
cur = conn.cursor(format=fmt_out)
- ph = "%s" if fmt_in == Format.TEXT else "%b"
a = list(map(chr, range(1, 256))) + [eur]
- (res,) = cur.execute(f"select {ph}::{typename}[]", (a,)).fetchone()
+ (res,) = cur.execute(f"select %{fmt_in}::{typename}[]", (a,)).fetchone()
assert res == a
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_text_array_ascii(conn, fmt_in, fmt_out):
conn.client_encoding = "ascii"
cur = conn.cursor(format=fmt_out)
a = list(map(chr, range(1, 256))) + [eur]
exp = [s.encode("utf8") for s in a]
- ph = "%s" if fmt_in == Format.TEXT else "%b"
- (res,) = cur.execute(f"select {ph}::text[]", (a,)).fetchone()
+ (res,) = cur.execute(f"select %{fmt_in}::text[]", (a,)).fetchone()
assert res == exp
#
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("pytype", [bytes, bytearray, memoryview])
def test_dump_1byte(conn, fmt_in, pytype):
cur = conn.cursor()
- ph = "%s" if fmt_in == Format.TEXT else "%b"
for i in range(0, 256):
obj = pytype(bytes([i]))
- cur.execute(f"select {ph} = %s::bytea", (obj, fr"\x{i:02x}"))
+ cur.execute(f"select %{fmt_in} = %s::bytea", (obj, fr"\x{i:02x}"))
assert cur.fetchone()[0] is True, i
assert cur.fetchone()[0] is True, i
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_load_1byte(conn, fmt_out):
cur = conn.cursor(format=fmt_out)
for i in range(0, 256):
assert cur.pgresult.fformat(0) == fmt_out
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_bytea_array(conn, fmt_in, fmt_out):
cur = conn.cursor(format=fmt_out)
a = [bytes(range(0, 256))]
- ph = "%s" if fmt_in == Format.TEXT else "%b"
- (res,) = cur.execute(f"select {ph}::bytea[]", (a,)).fetchone()
+ (res,) = cur.execute(f"select %{fmt_in}::bytea[]", (a,)).fetchone()
assert res == a
import pytest
+from psycopg3 import pq
from psycopg3.adapt import Format
-@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_in", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_uuid_dump(conn, fmt_in):
- ph = "%s" if fmt_in == Format.TEXT else "%b"
val = "12345678123456781234567812345679"
cur = conn.cursor()
- cur.execute(f"select {ph} = %s::uuid", (UUID(val), val))
+ cur.execute(f"select %{fmt_in} = %s::uuid", (UUID(val), val))
assert cur.fetchone()[0] is True
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", [pq.Format.TEXT, pq.Format.BINARY])
def test_uuid_load(conn, fmt_out):
cur = conn.cursor(format=fmt_out)
val = "12345678123456781234567812345679"