import re
import struct
-from typing import Any, Generator, List, Tuple
+from typing import Any, Generator, List, Optional, Tuple
from .. import errors as e
-from ..adapt import Format, Adapter, TypeCaster, Transformer, UnknownCaster
-from ..adapt import AdaptContext, TypeCasterType, TypeCasterFunc
+from ..adapt import Format, Adapter, TypeCaster, Transformer
+from ..adapt import AdaptContext
from .oids import builtins
TEXT_OID = builtins["text"].oid
class BaseListAdapter(Adapter):
def __init__(self, src: type, context: AdaptContext = None):
super().__init__(src, context)
- self.tx = Transformer(context)
+ self._tx = Transformer(context)
def _array_oid(self, base_oid: int) -> int:
"""
@Adapter.text(list)
-class ListAdapter(BaseListAdapter):
+class TextListAdapter(BaseListAdapter):
# from https://www.postgresql.org/docs/current/arrays.html#ARRAYS-IO
#
# The array output routine will put double quotes around element values if
elif item is None:
tokens.append(b"NULL")
else:
- ad = self.tx.adapt(item)
+ ad = self._tx.adapt(item)
if isinstance(ad, tuple):
if oid == 0:
oid = ad[1]
if dim == len(dims) - 1:
for item in L:
- ad = self.tx.adapt(item, Format.BINARY)
+ ad = self._tx.adapt(item, Format.BINARY)
if isinstance(ad, tuple):
if oid == 0:
oid = ad[1]
class ArrayCasterBase(TypeCaster):
- base_caster: TypeCasterType
+ base_oid: int
def __init__(self, oid: int, context: AdaptContext = None):
super().__init__(oid, context)
-
- self.caster_func: TypeCasterFunc
- if isinstance(self.base_caster, type):
- self.caster_func = self.base_caster(oid, context).cast
- else:
- self.caster_func = type(self).base_caster
+ self._tx = Transformer(context)
class ArrayCasterText(ArrayCasterBase):
+
# Tokenize an array representation into item and brackets
# TODO: currently recognise only , as delimiter. Should be configured
_re_parse = re.compile(
def cast(self, data: bytes) -> List[Any]:
rv = None
stack: List[Any] = []
+ cast = self._tx.get_cast_function(self.base_oid, Format.TEXT)
+
for m in self._re_parse.finditer(data):
t = m.group(1)
if t == b"{":
else:
if t.startswith(b'"'):
t = self._re_unescape.sub(br"\1", t[1:-1])
- v = self.caster_func(t)
+ v = cast(t)
stack[-1].append(v)
class ArrayCasterBinary(ArrayCasterBase):
- def __init__(self, oid: int, context: AdaptContext = None):
- super().__init__(oid, context)
- self.tx = Transformer(context)
-
def cast(self, data: bytes) -> List[Any]:
ndims, hasnull, oid = _struct_head.unpack_from(data[:12])
if not ndims:
return []
- fcast = self.tx.get_cast_function(oid, Format.BINARY)
+ fcast = self._tx.get_cast_function(oid, Format.BINARY)
p = 12 + 8 * ndims
dims = [
return agg(dims)
-class ArrayCaster(TypeCaster):
- @staticmethod
- def register(
- oid: int, # array oid
- caster: TypeCasterType,
- context: AdaptContext = None,
- format: Format = Format.TEXT,
- ) -> TypeCasterType:
- base = ArrayCasterText if format == Format.TEXT else ArrayCasterBinary
- name = f"{caster.__name__}_{format.name.lower()}_array"
- t = type(name, (base,), {"base_caster": caster})
- return TypeCaster.register(oid, t, context=context, format=format)
-
-
-class UnknownArrayCaster(ArrayCasterText):
- base_caster = UnknownCaster
+def register_array(
+ array_oid: int,
+ base_oid: int,
+ context: AdaptContext = None,
+ name: Optional[str] = None,
+) -> None:
+ if not name:
+ name = f"oid{base_oid}"
+
+ for format, base in (
+ (Format.TEXT, ArrayCasterText),
+ (Format.BINARY, ArrayCasterBinary),
+ ):
+ tcname = f"{name}_array_{format.name.lower()}"
+ t = type(tcname, (base,), {"base_oid": base_oid})
+ TypeCaster.register(array_oid, t, context=context, format=format)
+
+
+# Register associations between array and base oids
+for t in builtins:
+ if t.array_oid and (
+ (t.oid, Format.TEXT) in TypeCaster.globals
+ or (t.oid, Format.BINARY) in TypeCaster.globals
+ ):
+ register_array(t.array_oid, t.oid, name=t.name)
from ..adapt import Adapter, TypeCaster
from .oids import builtins
-from .array import ArrayCaster
FLOAT8_OID = builtins["float8"].oid
NUMERIC_OID = builtins["numeric"].oid
@TypeCaster.text(builtins["int4"].oid)
@TypeCaster.text(builtins["int8"].oid)
@TypeCaster.text(builtins["oid"].oid)
-@ArrayCaster.text(builtins["int2"].array_oid)
-@ArrayCaster.text(builtins["int4"].array_oid)
-@ArrayCaster.text(builtins["int8"].array_oid)
-@ArrayCaster.text(builtins["oid"].array_oid)
def cast_int(data: bytes) -> int:
return int(_decode(data)[0])
@TypeCaster.binary(builtins["int2"].oid)
-@ArrayCaster.binary(builtins["int2"].array_oid)
def cast_binary_int2(data: bytes) -> int:
rv: int = _int2_struct.unpack(data)[0]
return rv
@TypeCaster.binary(builtins["int4"].oid)
-@ArrayCaster.binary(builtins["int4"].array_oid)
def cast_binary_int4(data: bytes) -> int:
rv: int = _int4_struct.unpack(data)[0]
return rv
@TypeCaster.binary(builtins["int8"].oid)
-@ArrayCaster.binary(builtins["int8"].array_oid)
def cast_binary_int8(data: bytes) -> int:
rv: int = _int8_struct.unpack(data)[0]
return rv
@TypeCaster.binary(builtins["oid"].oid)
-@ArrayCaster.binary(builtins["oid"].array_oid)
def cast_binary_oid(data: bytes) -> int:
rv: int = _oid_struct.unpack(data)[0]
return rv
import pytest
import psycopg3
from psycopg3.types import builtins
-from psycopg3.adapt import TypeCaster, UnknownCaster, Format, Transformer
-from psycopg3.types.array import UnknownArrayCaster, ArrayCaster
+from psycopg3.adapt import Format, Transformer
+from psycopg3.types.array import register_array
tests_str = [
assert cur.fetchone()[0] == want
-def test_unknown(conn):
- # unknown for real
- assert builtins["aclitem"].array_oid not in TypeCaster.globals
- TypeCaster.register(
- builtins["aclitem"].array_oid, UnknownArrayCaster, context=conn
- )
- cur = conn.cursor()
- cur.execute("select '{postgres=arwdDxt/postgres}'::aclitem[]")
- res = cur.fetchone()[0]
- assert res == ["postgres=arwdDxt/postgres"]
-
-
def test_array_register(conn):
+ # unknown for real
cur = conn.cursor()
cur.execute("select '{postgres=arwdDxt/postgres}'::aclitem[]")
res = cur.fetchone()[0]
assert res == "{postgres=arwdDxt/postgres}"
- ArrayCaster.register(
- builtins["aclitem"].array_oid, UnknownCaster, context=conn
+ register_array(
+ builtins["aclitem"].array_oid, builtins["aclitem"].oid, context=conn
)
cur.execute("select '{postgres=arwdDxt/postgres}'::aclitem[]")
res = cur.fetchone()[0]