import re
import struct
+import logging
from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, Any, Generic, NamedTuple, TypeVar, cast
+from typing import TYPE_CHECKING, Any, Generic, NamedTuple, TypeAlias, TypeVar, cast
from functools import cache
from collections import namedtuple
from collections.abc import Callable, Sequence
if TYPE_CHECKING:
from .._connection_base import BaseConnection
+logger = logging.getLogger("psycopg")
+
_struct_oidlen = struct.Struct("!Ii")
_pack_oidlen = cast(Callable[[int, int], bytes], _struct_oidlen.pack)
_unpack_oidlen = cast(
)
T = TypeVar("T")
+InstanceMaker: TypeAlias = Callable[[Sequence[Any], Sequence[str]], T]
+SequenceMaker: TypeAlias = Callable[[T, Sequence[str]], Sequence[Any]]
class CompositeInfo(TypeInfo):
).format(regtype=cls._to_regtype(conn))
-class _SequenceDumper(RecursiveDumper):
- def _dump_sequence(
- self, obj: Sequence[Any], start: bytes, end: bytes, sep: bytes
- ) -> bytes:
- if not obj:
- return start + end
-
- parts: list[abc.Buffer] = [start]
+class TupleDumper(RecursiveDumper):
+ # Should be this, but it doesn't work
+ # oid = _oids.RECORD_OID
- for item in obj:
- if item is None:
- parts.append(sep)
- continue
+ def dump(self, obj: tuple[Any, ...]) -> Buffer | None:
+ return _dump_text_sequence(obj, self._tx)
- dumper = self._tx.get_dumper(item, PyFormat.from_pq(self.format))
- if (ad := dumper.dump(item)) is None:
- ad = b""
- elif not ad:
- ad = b'""'
- elif self._re_needs_quotes.search(ad):
- ad = b'"' + self._re_esc.sub(rb"\1\1", ad) + b'"'
- parts.append(ad)
- parts.append(sep)
+class _SequenceDumper(RecursiveDumper, Generic[T], ABC):
+ """
+ Base class for text dumpers taking an object and dumping it as a composite.
- parts[-1] = end
+ Abstract class: subclasses must specify the names of the fields making the
+ composite to return and the `make_sequence()` static method to convert the
+ object to dump to a sequence of values.
+ """
- return b"".join(parts)
+ # Subclasses must set this info
+ field_names: tuple[str, ...]
+ field_types: tuple[int, ...]
- _re_needs_quotes = re.compile(rb'[",\\\s()]')
- _re_esc = re.compile(rb"([\\\"])")
+ def dump(self, obj: T) -> bytes:
+ seq = type(self).make_sequence(obj, self.field_names)
+ return _dump_text_sequence(seq, self._tx)
+ @staticmethod
+ @abstractmethod
+ def make_sequence(obj: T, names: Sequence[str]) -> Sequence[Any]: ...
-class TupleDumper(_SequenceDumper):
- # Should be this, but it doesn't work
- # oid = _oids.RECORD_OID
- def dump(self, obj: tuple[Any, ...]) -> Buffer | None:
- return self._dump_sequence(obj, b"(", b")", b",")
+class _SequenceBinaryDumper(Dumper, Generic[T], ABC):
+ """
+ Base class for binary dumpers taking an object and dumping it as a composite.
+ Abstract class: subclasses must specify the names and types of the fields
+ making the target composite and the `make_sequence()` static method to
+ convert the object to dump to a sequence of values.
+ """
-class _SequenceBinaryDumper(Dumper):
format = pq.Format.BINARY
# Subclasses must set this info
+ field_names: tuple[str, ...]
field_types: tuple[int, ...]
- def __init__(self, cls: type, context: abc.AdaptContext | None = None):
+ def __init__(self, cls: type[T], context: abc.AdaptContext | None = None):
super().__init__(cls, context)
# Note: this class is not a RecursiveDumper because it would use the
nfields = len(self.field_types)
self._formats = (PyFormat.from_pq(self.format),) * nfields
- def dump(self, obj: tuple[Any, ...]) -> Buffer | None:
- out = bytearray(pack_len(len(obj)))
- adapted = self._tx.dump_sequence(obj, self._formats)
- for i in range(len(obj)):
- b = adapted[i]
- oid = self.field_types[i]
- if b is not None:
- out += _pack_oidlen(oid, len(b))
- out += b
- else:
- out += _pack_oidlen(oid, -1)
+ def dump(self, obj: T) -> Buffer | None:
+ seq = type(self).make_sequence(obj, self.field_names)
+ return _dump_binary_sequence(seq, self.field_types, self._formats, self._tx)
- return out
+ @staticmethod
+ @abstractmethod
+ def make_sequence(obj: T, names: Sequence[str]) -> Sequence[Any]: ...
class RecordLoader(RecursiveLoader):
info: CompositeInfo,
context: abc.AdaptContext | None = None,
factory: Callable[..., T] | None = None,
- make_instance: Callable[[Sequence[Any], Sequence[str]], T] | None = None,
+ make_instance: InstanceMaker[T] | None = None,
+ make_sequence: SequenceMaker[T] | None = None,
) -> None:
"""Register the adapters to load and dump a composite type.
:type context: `~psycopg.abc.AdaptContext` | `!None`
:param factory: Callable to convert the sequence of attributes read from
the composite into a Python object.
- :type factory: `!Callable[[Any, ...], T]` | `!None`
+ :type factory: `!Callable[..., T]` | `!None`
:param make_instance: optional function taking values and names as input and
returning the new type.
:type make_instance: `!Callable[[Sequence[Any], Sequence[str]], T]` | `!None`
+ :param make_sequence: optional function taking an instance and names as
+ input and returning the fields to dump.
+ :type make_sequence: `!Callable[[T, Sequence[str]], Sequence[Any]]` | `!None`
.. note::
# If the factory is a type, create and register dumpers for it
if isinstance(factory, type):
+
+ # Optimistically assume that the factory type is a sequence.
+ # If it is not, it will create a non-functioning dumper, but we don't
+ # risk backward incompatibility.
+ if not make_sequence:
+
+ if not issubclass(factory, Sequence):
+ logger.warning(
+ "the type %r is not a sequence: dumping these objects to the"
+ " database will fail. Please specify a `make_sequence`"
+ " argument in the `register_composite()` call",
+ factory.__name__,
+ )
+
+ def make_sequence(obj: T, name: Sequence[str]) -> Sequence[Any]:
+ raise TypeError(
+ f"{type(obj).__name__!r} objects cannot be dumped without"
+ " specifying 'make_sequence' in 'register_composite()'"
+ )
+
+ else:
+
+ def make_sequence(obj: T, name: Sequence[str]) -> Sequence[Any]:
+ return obj # type: ignore[return-value] # it's a sequence
+
+ type_name = factory.__name__
dumper: type[Dumper]
- dumper = _make_binary_dumper(info.name, info.oid, field_types)
+ dumper = _make_binary_dumper(
+ type_name, info.oid, field_names, field_types, make_sequence
+ )
adapters.register_dumper(factory, dumper)
# Default to the text dumper because it is more flexible
- dumper = _make_dumper(info.name, info.oid)
+ dumper = _make_dumper(
+ type_name, info.oid, field_names, field_types, make_sequence
+ )
adapters.register_dumper(factory, dumper)
info.python_type = factory
+ else:
+ if make_sequence:
+ raise TypeError(
+ "the factory {factory.__name__!r} is not a type: you cannot"
+ " create a dumper by specifying `make_sequence`."
+ )
+
def register_default_adapters(context: abc.AdaptContext) -> None:
adapters = context.adapters
return _make_nt(name, fields)
+def _dump_text_sequence(seq: Sequence[Any], tx: abc.Transformer) -> bytes:
+ if not seq:
+ return b"()"
+
+ parts: list[abc.Buffer] = [b"("]
+
+ for item in seq:
+ if item is None:
+ parts.append(b",")
+ continue
+
+ dumper = tx.get_dumper(item, PyFormat.TEXT)
+ if (ad := dumper.dump(item)) is None:
+ ad = b""
+ elif not ad:
+ ad = b'""'
+ elif _re_needs_quotes.search(ad):
+ ad = b'"' + _re_esc.sub(rb"\1\1", ad) + b'"'
+
+ parts.append(ad)
+ parts.append(b",")
+
+ parts[-1] = b")"
+
+ return b"".join(parts)
+
+
+_re_needs_quotes = re.compile(rb'[",\\\s()]')
+_re_esc = re.compile(rb"([\\\"])")
+
+
+def _dump_binary_sequence(
+ seq: Sequence[Any],
+ types: Sequence[int],
+ formats: Sequence[PyFormat],
+ tx: abc.Transformer,
+) -> bytearray:
+ out = bytearray(pack_len(len(seq)))
+ adapted = tx.dump_sequence(seq, formats)
+ for i in range(len(seq)):
+ b = adapted[i]
+ oid = types[i]
+ if b is not None:
+ out += _pack_oidlen(oid, len(b))
+ out += b
+ else:
+ out += _pack_oidlen(oid, -1)
+
+ return out
+
+
def _parse_text_record(data: abc.Buffer) -> list[bytes | None]:
"""
Split a non-empty representation of a composite type into components.
name: str,
field_names: tuple[str, ...],
field_types: tuple[int, ...],
- make_instance: Callable[[Sequence[Any], Sequence[str]], T],
+ make_instance: InstanceMaker[T],
) -> type[_CompositeLoader[T]]:
doc = f"Text loader for the '{name}' composite."
d = {
name: str,
field_names: tuple[str, ...],
field_types: tuple[int, ...],
- make_instance: Callable[[Sequence[Any], Sequence[str]], T] | None,
+ make_instance: InstanceMaker[T],
) -> type[_CompositeBinaryLoader[T]]:
doc = f"Binary loader for the '{name}' composite."
d = {
@cache
-def _make_dumper(name: str, oid: int) -> type[TupleDumper]:
- return type(f"{name.title()}Dumper", (TupleDumper,), {"oid": oid})
+def _make_dumper(
+ name: str,
+ oid: int,
+ field_names: tuple[str, ...],
+ field_types: tuple[int, ...],
+ make_sequence: SequenceMaker[T],
+) -> type[_SequenceDumper[T]]:
+ doc = f"Text dumper for the '{name}' composite."
+ d = {
+ "__doc__": doc,
+ "oid": oid,
+ "field_names": field_names,
+ "field_types": field_types,
+ "make_sequence": make_sequence,
+ }
+ return type(f"{name}Dumper", (_SequenceDumper,), d)
@cache
def _make_binary_dumper(
- name: str, oid: int, field_types: tuple[int, ...]
-) -> type[_SequenceBinaryDumper]:
- d = {"oid": oid, "field_types": field_types}
- return type(f"{name.title()}BinaryDumper", (_SequenceBinaryDumper,), d)
+ name: str,
+ oid: int,
+ field_names: tuple[str, ...],
+ field_types: tuple[int, ...],
+ make_sequence: SequenceMaker[T],
+) -> type[_SequenceBinaryDumper[T]]:
+ doc = f"Text dumper for the '{name}' composite."
+ d = {
+ "__doc__": doc,
+ "oid": oid,
+ "field_names": field_names,
+ "field_types": field_types,
+ "make_sequence": make_sequence,
+ }
+ return type(f"{name}BinaryDumper", (_SequenceBinaryDumper,), d)
+import logging
+
import pytest
from psycopg import postgres, pq, sql
from psycopg.adapt import PyFormat
from psycopg.postgres import types as builtins
from psycopg.types.range import Range
-from psycopg.types.composite import CompositeInfo, TupleDumper, register_composite
+from psycopg.types.composite import CompositeInfo, register_composite
from ..utils import eur
from ..fix_crdb import crdb_skip_message, is_crdb
assert isinstance(res[0].baz, float)
+class MyKeywordThing:
+ def __init__(self, *, foo, bar, baz):
+ self.foo, self.bar, self.baz = foo, bar, baz
+
+
@pytest.mark.parametrize("fmt_out", pq.Format)
def test_load_keyword_composite_factory(conn, testcomp, fmt_out):
info = CompositeInfo.fetch(conn, "testcomp")
- class MyKeywordThing:
- def __init__(self, *, foo, bar, baz):
- self.foo, self.bar, self.baz = foo, bar, baz
-
def make_instance(values, names):
return MyKeywordThing(**dict(zip(names, values)))
assert oid in conn.adapters._loaders[fmt]
-def test_type_dumper_registered(conn, testcomp):
- info = CompositeInfo.fetch(conn, "testcomp")
- register_composite(info, conn)
- assert issubclass(info.python_type, tuple)
- assert info.python_type.__name__ == "testcomp"
- d = conn.adapters.get_dumper(info.python_type, "s")
- assert issubclass(d, TupleDumper)
- assert d is not TupleDumper
-
- tc = info.python_type("foo", 42, 3.14)
- cur = conn.execute("select pg_typeof(%s)", [tc])
- assert cur.fetchone()[0] == "testcomp"
-
-
-def test_type_dumper_registered_binary(conn, testcomp):
+@pytest.mark.parametrize("fmt_in", PyFormat)
+def test_type_dumper_registered(conn, testcomp, fmt_in):
info = CompositeInfo.fetch(conn, "testcomp")
register_composite(info, conn)
assert issubclass(info.python_type, tuple)
assert info.python_type.__name__ == "testcomp"
+ assert conn.adapters.get_dumper(info.python_type, "s")
tc = info.python_type("foo", 42, 3.14)
- cur = conn.execute("select pg_typeof(%b)", [tc])
- assert cur.fetchone()[0] == "testcomp"
+ cur = conn.execute(
+ f"select pg_typeof(%(obj){fmt_in.value}), (%(obj){fmt_in.value}).bar",
+ {"obj": tc},
+ )
+ assert cur.fetchone() == ("testcomp", 42)
def test_callable_dumper_not_registered(conn, testcomp):
assert cur.fetchone()[0] == ("foo", 42, 3.14, 3.14)
+@pytest.mark.parametrize("fmt_in", PyFormat)
+def test_dump_no_sequence(conn, testcomp, fmt_in, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ def make_sequence(obj, names):
+ return [getattr(obj, attr) for attr in names]
+
+ info = CompositeInfo.fetch(conn, "testcomp")
+ register_composite(info, conn, factory=MyKeywordThing, make_sequence=make_sequence)
+ assert info.python_type is MyKeywordThing
+ assert not caplog.records
+
+ obj = MyKeywordThing(foo="foo", bar=42, baz=3.14)
+ cur = conn.execute(
+ f"select pg_typeof(%(obj){fmt_in.value}), (%(obj){fmt_in.value}).bar",
+ {"obj": obj},
+ )
+ assert cur.fetchone() == ("testcomp", 42)
+
+
+@pytest.mark.parametrize("fmt_in", PyFormat)
+def test_dump_no_sequence_failing(conn, testcomp, fmt_in, caplog):
+ caplog.set_level(logging.WARNING, logger="psycopg")
+
+ info = CompositeInfo.fetch(conn, "testcomp")
+ register_composite(info, conn, factory=MyKeywordThing)
+ assert info.python_type is MyKeywordThing
+ assert caplog.records
+ assert "'MyKeywordThing' is not a sequence" in caplog.records[0].message
+
+ obj = MyKeywordThing(foo="foo", bar=42, baz=3.14)
+ with pytest.raises(
+ TypeError, match="MyKeywordThing.*make_sequence.*register_composite"
+ ):
+ conn.execute(f"select pg_typeof(%{fmt_in.value})", [obj])
+
+
def test_no_info_error(conn):
with pytest.raises(TypeError, match="composite"):
register_composite(None, conn) # type: ignore[arg-type]