AdaptContext = Union[None, BaseConnection, BaseCursor, "Transformer"]
MaybeOid = Union[Optional[bytes], Tuple[Optional[bytes], int]]
-AdapterFunc = Callable[[Any], MaybeOid]
-AdapterType = Union[Type["Adapter"], AdapterFunc]
-AdaptersMap = Dict[Tuple[type, Format], AdapterType]
+DumpFunc = Callable[[Any], MaybeOid]
+DumperType = Union[Type["Dumper"], DumpFunc]
+DumpersMap = Dict[Tuple[type, Format], DumperType]
-TypeCasterFunc = Callable[[bytes], Any]
-TypeCasterType = Union[Type["TypeCaster"], TypeCasterFunc]
-TypeCastersMap = Dict[Tuple[int, Format], TypeCasterType]
+LoadFunc = Callable[[bytes], Any]
+LoaderType = Union[Type["Loader"], LoadFunc]
+LoadersMap = Dict[Tuple[int, Format], LoaderType]
-class Adapter:
- globals: AdaptersMap = {}
+class Dumper:
+ globals: DumpersMap = {}
connection: Optional[BaseConnection]
def __init__(self, src: type, context: AdaptContext = None):
self.context = context
self.connection = _connection_from_context(context)
- def adapt(self, obj: Any) -> Union[bytes, Tuple[bytes, int]]:
+ def dump(self, obj: Any) -> Union[bytes, Tuple[bytes, int]]:
raise NotImplementedError()
@classmethod
def register(
cls,
src: type,
- adapter: AdapterType,
+ dumper: DumperType,
context: AdaptContext = None,
format: Format = Format.TEXT,
- ) -> AdapterType:
+ ) -> DumperType:
if not isinstance(src, type):
raise TypeError(
- f"adapters should be registered on classes, got {src} instead"
+ f"dumpers should be registered on classes, got {src} instead"
)
if not (
- callable(adapter)
- or (isinstance(adapter, type) and issubclass(adapter, Adapter))
+ callable(dumper)
+ or (isinstance(dumper, type) and issubclass(dumper, Dumper))
):
raise TypeError(
- f"adapters should be callable or Adapter subclasses,"
- f" got {adapter} instead"
+ f"dumpers should be callable or Dumper subclasses,"
+ f" got {dumper} instead"
)
- where = context.adapters if context is not None else Adapter.globals
- where[src, format] = adapter
- return adapter
+ where = context.dumpers if context is not None else Dumper.globals
+ where[src, format] = dumper
+ return dumper
@classmethod
def register_binary(
- cls, src: type, adapter: AdapterType, context: AdaptContext = None,
- ) -> AdapterType:
- return cls.register(src, adapter, context, format=Format.BINARY)
+ cls, src: type, dumper: DumperType, context: AdaptContext = None,
+ ) -> DumperType:
+ return cls.register(src, dumper, context, format=Format.BINARY)
@classmethod
- def text(cls, src: type) -> Callable[[AdapterType], AdapterType]:
- def text_(adapter: AdapterType) -> AdapterType:
- cls.register(src, adapter)
- return adapter
+ def text(cls, src: type) -> Callable[[DumperType], DumperType]:
+ def text_(dumper: DumperType) -> DumperType:
+ cls.register(src, dumper)
+ return dumper
return text_
@classmethod
- def binary(cls, src: type) -> Callable[[AdapterType], AdapterType]:
- def binary_(adapter: AdapterType) -> AdapterType:
- cls.register_binary(src, adapter)
- return adapter
+ def binary(cls, src: type) -> Callable[[DumperType], DumperType]:
+ def binary_(dumper: DumperType) -> DumperType:
+ cls.register_binary(src, dumper)
+ return dumper
return binary_
-class TypeCaster:
- globals: TypeCastersMap = {}
+class Loader:
+ globals: LoadersMap = {}
connection: Optional[BaseConnection]
def __init__(self, oid: int, context: AdaptContext = None):
self.context = context
self.connection = _connection_from_context(context)
- def cast(self, data: bytes) -> Any:
+ def load(self, data: bytes) -> Any:
raise NotImplementedError()
@classmethod
def register(
cls,
oid: int,
- caster: TypeCasterType,
+ loader: LoaderType,
context: AdaptContext = None,
format: Format = Format.TEXT,
- ) -> TypeCasterType:
+ ) -> LoaderType:
if not isinstance(oid, int):
raise TypeError(
- f"typecasters should be registered on oid, got {oid} instead"
+ f"typeloaders should be registered on oid, got {oid} instead"
)
if not (
- callable(caster)
- or (isinstance(caster, type) and issubclass(caster, TypeCaster))
+ callable(loader)
+ or (isinstance(loader, type) and issubclass(loader, Loader))
):
raise TypeError(
- f"adapters should be callable or TypeCaster subclasses,"
- f" got {caster} instead"
+ f"dumpers should be callable or Loader subclasses,"
+ f" got {loader} instead"
)
- where = context.casters if context is not None else TypeCaster.globals
- where[oid, format] = caster
- return caster
+ where = context.loaders if context is not None else Loader.globals
+ where[oid, format] = loader
+ return loader
@classmethod
def register_binary(
- cls, oid: int, caster: TypeCasterType, context: AdaptContext = None,
- ) -> TypeCasterType:
- return cls.register(oid, caster, context, format=Format.BINARY)
+ cls, oid: int, loader: LoaderType, context: AdaptContext = None,
+ ) -> LoaderType:
+ return cls.register(oid, loader, context, format=Format.BINARY)
@classmethod
- def text(cls, oid: int) -> Callable[[TypeCasterType], TypeCasterType]:
- def text_(caster: TypeCasterType) -> TypeCasterType:
- cls.register(oid, caster)
- return caster
+ def text(cls, oid: int) -> Callable[[LoaderType], LoaderType]:
+ def text_(loader: LoaderType) -> LoaderType:
+ cls.register(oid, loader)
+ return loader
return text_
@classmethod
- def binary(cls, oid: int) -> Callable[[TypeCasterType], TypeCasterType]:
- def binary_(caster: TypeCasterType) -> TypeCasterType:
- cls.register_binary(oid, caster)
- return caster
+ def binary(cls, oid: int) -> Callable[[LoaderType], LoaderType]:
+ def binary_(loader: LoaderType) -> LoaderType:
+ cls.register_binary(oid, loader)
+ return loader
return binary_
def __init__(self, context: AdaptContext = None):
self.connection: Optional[BaseConnection]
- self.adapters: AdaptersMap
- self.casters: TypeCastersMap
- self._adapters_maps: List[AdaptersMap] = []
- self._casters_maps: List[TypeCastersMap] = []
+ self.dumpers: DumpersMap
+ self.loaders: LoadersMap
+ self._dumpers_maps: List[DumpersMap] = []
+ self._loaders_maps: List[LoadersMap] = []
self._setup_context(context)
- # mapping class, fmt -> adaptation function
- self._adapt_funcs: Dict[Tuple[type, Format], AdapterFunc] = {}
+ # mapping class, fmt -> dump function
+ self._dump_funcs: Dict[Tuple[type, Format], DumpFunc] = {}
- # mapping oid, fmt -> cast function
- self._cast_funcs: Dict[Tuple[int, Format], TypeCasterFunc] = {}
+ # mapping oid, fmt -> load function
+ self._load_funcs: Dict[Tuple[int, Format], LoadFunc] = {}
- # sequence of cast function from value to python
+ # sequence of load functions from value to python
# the length of the result columns
- self._row_casters: List[TypeCasterFunc] = []
+ self._row_loaders: List[LoadFunc] = []
def _setup_context(self, context: AdaptContext) -> None:
if context is None:
self.connection = None
- self.adapters = {}
- self.casters = {}
- self._adapters_maps = [self.adapters]
- self._casters_maps = [self.casters]
+ self.dumpers = {}
+ self.loaders = {}
+ self._dumpers_maps = [self.dumpers]
+ self._loaders_maps = [self.loaders]
elif isinstance(context, Transformer):
# A transformer created from a transformers: usually it happens
# for nested types: share the entire state of the parent
self.connection = context.connection
- self.adapters = context.adapters
- self.casters = context.casters
- self._adapters_maps.extend(context._adapters_maps)
- self._casters_maps.extend(context._casters_maps)
+ self.dumpers = context.dumpers
+ self.loaders = context.loaders
+ self._dumpers_maps.extend(context._dumpers_maps)
+ self._loaders_maps.extend(context._loaders_maps)
# the global maps are already in the lists
return
elif isinstance(context, BaseCursor):
self.connection = context.conn
- self.adapters = {}
- self._adapters_maps.extend(
- (self.adapters, context.adapters, self.connection.adapters)
+ self.dumpers = {}
+ self._dumpers_maps.extend(
+ (self.dumpers, context.dumpers, self.connection.dumpers)
)
- self.casters = {}
- self._casters_maps.extend(
- (self.casters, context.casters, self.connection.casters)
+ self.loaders = {}
+ self._loaders_maps.extend(
+ (self.loaders, context.loaders, self.connection.loaders)
)
elif isinstance(context, BaseConnection):
self.connection = context
- self.adapters = {}
- self._adapters_maps.extend((self.adapters, context.adapters))
- self.casters = {}
- self._casters_maps.extend((self.casters, context.casters))
+ self.dumpers = {}
+ self._dumpers_maps.extend((self.dumpers, context.dumpers))
+ self.loaders = {}
+ self._loaders_maps.extend((self.loaders, context.loaders))
- self._adapters_maps.append(Adapter.globals)
- self._casters_maps.append(TypeCaster.globals)
+ self._dumpers_maps.append(Dumper.globals)
+ self._loaders_maps.append(Loader.globals)
- def adapt_sequence(
+ def dump_sequence(
self, objs: Iterable[Any], formats: Iterable[Format]
) -> Tuple[List[Optional[bytes]], List[int]]:
out = []
types = []
for var, fmt in zip(objs, formats):
- data = self.adapt(var, fmt)
+ data = self.dump(var, fmt)
if isinstance(data, tuple):
oid = data[1]
data = data[0]
return out, types
- def adapt(self, obj: None, format: Format = Format.TEXT) -> MaybeOid:
+ def dump(self, obj: None, format: Format = Format.TEXT) -> MaybeOid:
if obj is None:
return None, TEXT_OID
src = type(obj)
- func = self.get_adapt_function(src, format)
+ func = self.get_dump_function(src, format)
return func(obj)
- def get_adapt_function(self, src: type, format: Format) -> AdapterFunc:
+ def get_dump_function(self, src: type, format: Format) -> DumpFunc:
key = (src, format)
try:
- return self._adapt_funcs[key]
+ return self._dump_funcs[key]
except KeyError:
pass
- adapter = self.lookup_adapter(src, format)
- func: AdapterFunc
- if isinstance(adapter, type):
- func = adapter(src, self).adapt
+ dumper = self.lookup_dumper(src, format)
+ func: DumpFunc
+ if isinstance(dumper, type):
+ func = dumper(src, self).dump
else:
- func = adapter
+ func = dumper
- self._adapt_funcs[key] = func
+ self._dump_funcs[key] = func
return func
- def lookup_adapter(self, src: type, format: Format) -> AdapterType:
+ def lookup_dumper(self, src: type, format: Format) -> DumperType:
key = (src, format)
- for amap in self._adapters_maps:
+ for amap in self._dumpers_maps:
if key in amap:
return amap[key]
)
def set_row_types(self, types: Iterable[Tuple[int, Format]]) -> None:
- rc = self._row_casters = []
+ rc = self._row_loaders = []
for oid, fmt in types:
- rc.append(self.get_cast_function(oid, fmt))
+ rc.append(self.get_load_function(oid, fmt))
- def cast_sequence(
+ def load_sequence(
self, record: Iterable[Optional[bytes]]
) -> Generator[Any, None, None]:
- for val, caster in zip(record, self._row_casters):
+ for val, loader in zip(record, self._row_loaders):
if val is not None:
- yield caster(val)
+ yield loader(val)
else:
yield None
- def cast(self, data: bytes, oid: int, format: Format = Format.TEXT) -> Any:
+ def load(self, data: bytes, oid: int, format: Format = Format.TEXT) -> Any:
if data is not None:
- f = self.get_cast_function(oid, format)
+ f = self.get_load_function(oid, format)
return f(data)
else:
return None
- def get_cast_function(self, oid: int, format: Format) -> TypeCasterFunc:
+ def get_load_function(self, oid: int, format: Format) -> LoadFunc:
key = (oid, format)
try:
- return self._cast_funcs[key]
+ return self._load_funcs[key]
except KeyError:
pass
- caster = self.lookup_caster(oid, format)
- func: TypeCasterFunc
- if isinstance(caster, type):
- func = caster(oid, self).cast
+ loader = self.lookup_loader(oid, format)
+ func: LoadFunc
+ if isinstance(loader, type):
+ func = loader(oid, self).load
else:
- func = caster
+ func = loader
- self._cast_funcs[key] = func
+ self._load_funcs[key] = func
return func
- def lookup_caster(self, oid: int, format: Format) -> TypeCasterType:
+ def lookup_loader(self, oid: int, format: Format) -> LoaderType:
key = (oid, format)
- for tcmap in self._casters_maps:
+ for tcmap in self._loaders_maps:
if key in tcmap:
return tcmap[key]
- return TypeCaster.globals[INVALID_OID, format]
+ return Loader.globals[INVALID_OID, format]
-@TypeCaster.text(INVALID_OID)
-class UnknownCaster(TypeCaster):
+@Loader.text(INVALID_OID)
+class UnknownLoader(Loader):
"""
Fallback object to convert unknown types to Python
"""
else:
self.decode = codecs.lookup("utf8").decode
- def cast(self, data: bytes) -> str:
+ def load(self, data: bytes) -> str:
return self.decode(data)[0]
-@TypeCaster.binary(INVALID_OID)
-def cast_unknown(data: bytes) -> bytes:
+@Loader.binary(INVALID_OID)
+def load_unknown(data: bytes) -> bytes:
return data
RV = TypeVar("RV")
if TYPE_CHECKING:
- from .adapt import AdaptersMap, TypeCastersMap
+ from .adapt import DumpersMap, LoadersMap
class BaseConnection:
def __init__(self, pgconn: pq.PGconn):
self.pgconn = pgconn
self.cursor_factory = cursor.BaseCursor
- self.adapters: AdaptersMap = {}
- self.casters: TypeCastersMap = {}
+ self.dumpers: DumpersMap = {}
+ self.loaders: LoadersMap = {}
# name of the postgres encoding (in bytes)
self._pgenc = b""
AsyncConnection,
QueryGen,
)
- from .adapt import AdaptersMap, TypeCastersMap
+ from .adapt import DumpersMap, LoadersMap
class BaseCursor:
def __init__(self, conn: "BaseConnection", binary: bool = False):
self.conn = conn
self.binary = binary
- self.adapters: AdaptersMap = {}
- self.casters: TypeCastersMap = {}
+ self.dumpers: DumpersMap = {}
+ self.loaders: LoadersMap = {}
self._reset()
def _reset(self) -> None:
assert isinstance(vars, Mapping)
vars = reorder_params(vars, order)
assert isinstance(vars, Sequence)
- params, types = self._transformer.adapt_sequence(vars, formats)
+ params, types = self._transformer.dump_sequence(vars, formats)
self.conn.pgconn.send_query_params(
query,
params,
else:
return None
- def _cast_row(self, n: int) -> Optional[Tuple[Any, ...]]:
+ def _load_row(self, n: int) -> Optional[Tuple[Any, ...]]:
res = self.pgresult
if res is None:
return None
return None
return tuple(
- self._transformer.cast_sequence(
+ self._transformer.load_sequence(
res.get_value(n, i) for i in range(res.nfields)
)
)
return self
def fetchone(self) -> Optional[Sequence[Any]]:
- rv = self._cast_row(self._pos)
+ rv = self._load_row(self._pos)
if rv is not None:
self._pos += 1
return rv
return self
async def fetchone(self) -> Optional[Sequence[Any]]:
- rv = self._cast_row(self._pos)
+ rv = self._load_row(self._pos)
if rv is not None:
self._pos += 1
return rv
"SHIFT_JIS_2004": "shift_jis_2004",
"SJIS": "shift_jis",
# this actually means no encoding, see PostgreSQL docs
- # it is special-cased by the text typecaster.
+ # it is special-cased by the text loader.
"SQL_ASCII": "ascii",
"UHC": "cp949",
"UTF8": "utf-8",
from typing import Any, Generator, List, Optional, Tuple
from .. import errors as e
-from ..adapt import Format, Adapter, TypeCaster, Transformer
+from ..adapt import Format, Dumper, Loader, Transformer
from ..adapt import AdaptContext
from .oids import builtins
TEXT_ARRAY_OID = builtins["text"].array_oid
-class BaseListAdapter(Adapter):
+class BaseListDumper(Dumper):
def __init__(self, src: type, context: AdaptContext = None):
super().__init__(src, context)
self._tx = Transformer(context)
return oid or TEXT_ARRAY_OID
-@Adapter.text(list)
-class TextListAdapter(BaseListAdapter):
+@Dumper.text(list)
+class TextListDumper(BaseListDumper):
# from https://www.postgresql.org/docs/current/arrays.html#ARRAYS-IO
#
# The array output routine will put double quotes around element values if
# backslash-escaped.
_re_escape = re.compile(br'(["\\])')
- def adapt(self, obj: List[Any]) -> Tuple[bytes, int]:
+ def dump(self, obj: List[Any]) -> Tuple[bytes, int]:
tokens: List[bytes] = []
oid = 0
- def adapt_list(obj: List[Any]) -> None:
+ def dump_list(obj: List[Any]) -> None:
nonlocal oid
if not obj:
tokens.append(b"{")
for item in obj:
if isinstance(item, list):
- adapt_list(item)
+ dump_list(item)
elif item is None:
tokens.append(b"NULL")
else:
- ad = self._tx.adapt(item)
+ ad = self._tx.dump(item)
if isinstance(ad, tuple):
if oid == 0:
oid = ad[1]
tokens[-1] = b"}"
- adapt_list(obj)
+ dump_list(obj)
return b"".join(tokens), self._array_oid(oid)
-@Adapter.binary(list)
-class BinaryListAdapter(BaseListAdapter):
- def adapt(self, obj: List[Any]) -> Tuple[bytes, int]:
+@Dumper.binary(list)
+class BinaryListDumper(BaseListDumper):
+ def dump(self, obj: List[Any]) -> Tuple[bytes, int]:
if not obj:
return _struct_head.pack(0, 0, TEXT_OID), TEXT_ARRAY_OID
calc_dims(obj)
- def adapt_list(L: List[Any], dim: int) -> None:
+ def dump_list(L: List[Any], dim: int) -> None:
nonlocal oid, hasnull
if len(L) != dims[dim]:
raise e.DataError("nested lists have inconsistent lengths")
if dim == len(dims) - 1:
for item in L:
- ad = self._tx.adapt(item, Format.BINARY)
+ ad = self._tx.dump(item, Format.BINARY)
if isinstance(ad, tuple):
if oid == 0:
oid = ad[1]
raise e.DataError(
"nested lists have inconsistent depths"
)
- adapt_list(item, dim + 1) # type: ignore
+ dump_list(item, dim + 1) # type: ignore
- adapt_list(obj, 0)
+ dump_list(obj, 0)
if oid == 0:
oid = TEXT_OID
return b"".join(data), self._array_oid(oid)
-class ArrayCasterBase(TypeCaster):
+class BaseArrayLoader(Loader):
base_oid: int
def __init__(self, oid: int, context: AdaptContext = None):
self._tx = Transformer(context)
-class ArrayCasterText(ArrayCasterBase):
+class TextArrayLoader(BaseArrayLoader):
# Tokenize an array representation into item and brackets
# TODO: currently recognise only , as delimiter. Should be configured
"""
)
- def cast(self, data: bytes) -> List[Any]:
+ def load(self, data: bytes) -> List[Any]:
rv = None
stack: List[Any] = []
- cast = self._tx.get_cast_function(self.base_oid, Format.TEXT)
+ cast = self._tx.get_load_function(self.base_oid, Format.TEXT)
for m in self._re_parse.finditer(data):
t = m.group(1)
_struct_len = struct.Struct("!i")
-class ArrayCasterBinary(ArrayCasterBase):
- def cast(self, data: bytes) -> List[Any]:
+class BinaryArrayLoader(BaseArrayLoader):
+ def load(self, data: bytes) -> List[Any]:
ndims, hasnull, oid = _struct_head.unpack_from(data[:12])
if not ndims:
return []
- fcast = self._tx.get_cast_function(oid, Format.BINARY)
+ fcast = self._tx.get_load_function(oid, Format.BINARY)
p = 12 + 8 * ndims
dims = [
name = f"oid{base_oid}"
for format, base in (
- (Format.TEXT, ArrayCasterText),
- (Format.BINARY, ArrayCasterBinary),
+ (Format.TEXT, TextArrayLoader),
+ (Format.BINARY, BinaryArrayLoader),
):
- tcname = f"{name.title()}Array{format.name.title()}Caster"
+ tcname = f"{name.title()}Array{format.name.title()}Loader"
t = type(tcname, (base,), {"base_oid": base_oid})
- TypeCaster.register(array_oid, t, context=context, format=format)
+ Loader.register(array_oid, t, context=context, format=format)
def register_all_arrays() -> None:
"""
- Associate the array oid of all the types in TypeCaster.globals.
+ Associate the array oid of all the types in Loader.globals.
This function is designed to be called once at import time, after having
- registered all the base casters.
+ registered all the base loaders.
"""
for t in builtins:
if t.array_oid and (
- (t.oid, Format.TEXT) in TypeCaster.globals
- or (t.oid, Format.BINARY) in TypeCaster.globals
+ (t.oid, Format.TEXT) in Loader.globals
+ or (t.oid, Format.BINARY) in Loader.globals
):
register(t.array_oid, t.oid, name=t.name)
from typing import Optional, TYPE_CHECKING
from . import array
-from ..adapt import Format, Adapter, TypeCaster, Transformer, AdaptContext
+from ..adapt import Format, Dumper, Loader, Transformer, AdaptContext
from .oids import builtins, TypeInfo
if TYPE_CHECKING:
info.name, [f.name for f in info.fields]
)
- # generate and register a customized text typecaster
- caster = type(
- f"{info.name.title()}Caster",
- (CompositeCaster,),
+ # generate and register a customized text loader
+ loader = type(
+ f"{info.name.title()}Loader",
+ (CompositeLoader,),
{
"factory": factory,
"fields_types": tuple(f.type_oid for f in info.fields),
},
)
- TypeCaster.register(info.oid, caster, context=context, format=Format.TEXT)
+ Loader.register(info.oid, loader, context=context, format=Format.TEXT)
- # generate and register a customized binary typecaster
- caster = type(
- f"{info.name.title()}BinaryCaster",
- (CompositeBinaryCaster,),
+ # generate and register a customized binary loader
+ loader = type(
+ f"Binary{info.name.title()}Loader",
+ (BinaryCompositeLoader,),
{"factory": factory},
)
- TypeCaster.register(
- info.oid, caster, context=context, format=Format.BINARY
- )
+ Loader.register(info.oid, loader, context=context, format=Format.BINARY)
if info.array_oid:
array.register(
"""
-@Adapter.text(tuple)
-class TextTupleAdapter(Adapter):
+@Dumper.text(tuple)
+class TextTupleDumper(Dumper):
def __init__(self, src: type, context: AdaptContext = None):
super().__init__(src, context)
self._tx = Transformer(context)
- def adapt(self, obj: Tuple[Any, ...]) -> Tuple[bytes, int]:
+ def dump(self, obj: Tuple[Any, ...]) -> Tuple[bytes, int]:
if not obj:
return b"()", TEXT_OID
parts.append(b",")
continue
- ad = self._tx.adapt(item)
+ ad = self._tx.dump(item)
if isinstance(ad, tuple):
ad = ad[0]
if ad is None:
_re_escape = re.compile(br"([\"])")
-class BaseCompositeCaster(TypeCaster):
+class BaseCompositeLoader(Loader):
def __init__(self, oid: int, context: AdaptContext = None):
super().__init__(oid, context)
self._tx = Transformer(context)
-@TypeCaster.text(builtins["record"].oid)
-class RecordCaster(BaseCompositeCaster):
- def cast(self, data: bytes) -> Tuple[Any, ...]:
- cast = self._tx.get_cast_function(TEXT_OID, format=Format.TEXT)
+@Loader.text(builtins["record"].oid)
+class RecordLoader(BaseCompositeLoader):
+ def load(self, data: bytes) -> Tuple[Any, ...]:
+ cast = self._tx.get_load_function(TEXT_OID, format=Format.TEXT)
return tuple(
cast(token) if token is not None else None
for token in self._parse_record(data)
_struct_oidlen = struct.Struct("!Ii")
-@TypeCaster.binary(builtins["record"].oid)
-class RecordBinaryCaster(BaseCompositeCaster):
+@Loader.binary(builtins["record"].oid)
+class BinaryRecordLoader(BaseCompositeLoader):
_types_set = False
- def cast(self, data: bytes) -> Tuple[Any, ...]:
+ def load(self, data: bytes) -> Tuple[Any, ...]:
if not self._types_set:
self._config_types(data)
self._types_set = True
return tuple(
- self._tx.cast_sequence(
+ self._tx.load_sequence(
data[offset : offset + length] if length != -1 else None
for _, offset, length in self._walk_record(data)
)
)
-class CompositeCaster(RecordCaster):
+class CompositeLoader(RecordLoader):
factory: Callable[..., Any]
fields_types: Tuple[int, ...]
_types_set = False
- def cast(self, data: bytes) -> Any:
+ def load(self, data: bytes) -> Any:
if not self._types_set:
self._config_types(data)
self._types_set = True
return type(self).factory(
- *self._tx.cast_sequence(self._parse_record(data))
+ *self._tx.load_sequence(self._parse_record(data))
)
def _config_types(self, data: bytes) -> None:
self._tx.set_row_types((oid, Format.TEXT) for oid in self.fields_types)
-class CompositeBinaryCaster(RecordBinaryCaster):
+class BinaryCompositeLoader(BinaryRecordLoader):
factory: Callable[..., Any]
- def cast(self, data: bytes) -> Any:
- r = super().cast(data)
+ def load(self, data: bytes) -> Any:
+ r = super().load(data)
return type(self).factory(*r)
"""
-Adapters of numeric types.
+Adapers for numeric types.
"""
# Copyright (C) 2020 The Psycopg Team
from decimal import Decimal
from typing import Tuple
-from ..adapt import Adapter, TypeCaster
+from ..adapt import Dumper, Loader
from .oids import builtins
FLOAT8_OID = builtins["float8"].oid
_float8_struct = struct.Struct("!d")
-@Adapter.text(int)
-def adapt_int(obj: int) -> Tuple[bytes, int]:
+@Dumper.text(int)
+def dump_int(obj: int) -> Tuple[bytes, int]:
# We don't know the size of it, so we have to return a type big enough
return _encode(str(obj))[0], NUMERIC_OID
-@Adapter.text(float)
-def adapt_float(obj: float) -> Tuple[bytes, int]:
+@Dumper.text(float)
+def dump_float(obj: float) -> Tuple[bytes, int]:
# Float can't be bigger than this instead
return _encode(str(obj))[0], FLOAT8_OID
-@Adapter.text(Decimal)
-def adapt_decimal(obj: Decimal) -> Tuple[bytes, int]:
+@Dumper.text(Decimal)
+def dump_decimal(obj: Decimal) -> Tuple[bytes, int]:
return _encode(str(obj))[0], NUMERIC_OID
-_bool_adapt = {
+_bool_dump = {
True: (b"t", builtins["bool"].oid),
False: (b"f", builtins["bool"].oid),
}
-_bool_binary_adapt = {
+_bool_binary_dump = {
True: (b"\x01", builtins["bool"].oid),
False: (b"\x00", builtins["bool"].oid),
}
-@Adapter.text(bool)
-def adapt_bool(obj: bool) -> Tuple[bytes, int]:
- return _bool_adapt[obj]
+@Dumper.text(bool)
+def dump_bool(obj: bool) -> Tuple[bytes, int]:
+ return _bool_dump[obj]
-@Adapter.binary(bool)
-def adapt_binary_bool(obj: bool) -> Tuple[bytes, int]:
- return _bool_binary_adapt[obj]
+@Dumper.binary(bool)
+def dump_binary_bool(obj: bool) -> Tuple[bytes, int]:
+ return _bool_binary_dump[obj]
-@TypeCaster.text(builtins["int2"].oid)
-@TypeCaster.text(builtins["int4"].oid)
-@TypeCaster.text(builtins["int8"].oid)
-@TypeCaster.text(builtins["oid"].oid)
-def cast_int(data: bytes) -> int:
+@Loader.text(builtins["int2"].oid)
+@Loader.text(builtins["int4"].oid)
+@Loader.text(builtins["int8"].oid)
+@Loader.text(builtins["oid"].oid)
+def load_int(data: bytes) -> int:
return int(_decode(data)[0])
-@TypeCaster.binary(builtins["int2"].oid)
-def cast_binary_int2(data: bytes) -> int:
+@Loader.binary(builtins["int2"].oid)
+def load_binary_int2(data: bytes) -> int:
rv: int = _int2_struct.unpack(data)[0]
return rv
-@TypeCaster.binary(builtins["int4"].oid)
-def cast_binary_int4(data: bytes) -> int:
+@Loader.binary(builtins["int4"].oid)
+def load_binary_int4(data: bytes) -> int:
rv: int = _int4_struct.unpack(data)[0]
return rv
-@TypeCaster.binary(builtins["int8"].oid)
-def cast_binary_int8(data: bytes) -> int:
+@Loader.binary(builtins["int8"].oid)
+def load_binary_int8(data: bytes) -> int:
rv: int = _int8_struct.unpack(data)[0]
return rv
-@TypeCaster.binary(builtins["oid"].oid)
-def cast_binary_oid(data: bytes) -> int:
+@Loader.binary(builtins["oid"].oid)
+def load_binary_oid(data: bytes) -> int:
rv: int = _oid_struct.unpack(data)[0]
return rv
-@TypeCaster.text(builtins["float4"].oid)
-@TypeCaster.text(builtins["float8"].oid)
-def cast_float(data: bytes) -> float:
+@Loader.text(builtins["float4"].oid)
+@Loader.text(builtins["float8"].oid)
+def load_float(data: bytes) -> float:
# it supports bytes directly
return float(data)
-@TypeCaster.binary(builtins["float4"].oid)
-def cast_binary_float4(data: bytes) -> float:
+@Loader.binary(builtins["float4"].oid)
+def load_binary_float4(data: bytes) -> float:
rv: float = _float4_struct.unpack(data)[0]
return rv
-@TypeCaster.binary(builtins["float8"].oid)
-def cast_binary_float8(data: bytes) -> float:
+@Loader.binary(builtins["float8"].oid)
+def load_binary_float8(data: bytes) -> float:
rv: float = _float8_struct.unpack(data)[0]
return rv
-@TypeCaster.text(builtins["numeric"].oid)
-def cast_numeric(data: bytes) -> Decimal:
+@Loader.text(builtins["numeric"].oid)
+def load_numeric(data: bytes) -> Decimal:
return Decimal(_decode(data)[0])
-_bool_casts = {b"t": True, b"f": False}
-_bool_binary_casts = {b"\x01": True, b"\x00": False}
+_bool_loads = {b"t": True, b"f": False}
+_bool_binary_loads = {b"\x01": True, b"\x00": False}
-@TypeCaster.text(builtins["bool"].oid)
-def cast_bool(data: bytes) -> bool:
- return _bool_casts[data]
+@Loader.text(builtins["bool"].oid)
+def load_bool(data: bytes) -> bool:
+ return _bool_loads[data]
-@TypeCaster.binary(builtins["bool"].oid)
-def cast_binary_bool(data: bytes) -> bool:
- return _bool_binary_casts[data]
+@Loader.binary(builtins["bool"].oid)
+def load_binary_bool(data: bytes) -> bool:
+ return _bool_binary_loads[data]
"""
-Adapters of textual types.
+Adapters for textual types.
"""
# Copyright (C) 2020 The Psycopg Team
import codecs
from typing import Optional, Tuple, Union
-from ..adapt import Adapter, TypeCaster, AdaptContext
+from ..adapt import Dumper, Loader, AdaptContext
from ..utils.typing import EncodeFunc, DecodeFunc
from ..pq import Escaping
from .oids import builtins
BYTEA_OID = builtins["bytea"].oid
-@Adapter.text(str)
-@Adapter.binary(str)
-class StringAdapter(Adapter):
+@Dumper.text(str)
+@Dumper.binary(str)
+class StringDumper(Dumper):
def __init__(self, src: type, context: AdaptContext):
super().__init__(src, context)
else:
self._encode = codecs.lookup("utf8").encode
- def adapt(self, obj: str) -> bytes:
+ def dump(self, obj: str) -> bytes:
return self._encode(obj)[0]
-@TypeCaster.text(builtins["text"].oid)
-@TypeCaster.binary(builtins["text"].oid)
-@TypeCaster.text(builtins["varchar"].oid)
-@TypeCaster.binary(builtins["varchar"].oid)
-class StringCaster(TypeCaster):
+@Loader.text(builtins["text"].oid)
+@Loader.binary(builtins["text"].oid)
+@Loader.text(builtins["varchar"].oid)
+@Loader.binary(builtins["varchar"].oid)
+class StringLoader(Loader):
decode: Optional[DecodeFunc]
else:
self.decode = codecs.lookup("utf8").decode
- def cast(self, data: bytes) -> Union[bytes, str]:
+ def load(self, data: bytes) -> Union[bytes, str]:
if self.decode is not None:
return self.decode(data)[0]
else:
return data
-@TypeCaster.text(builtins["name"].oid)
-@TypeCaster.binary(builtins["name"].oid)
-@TypeCaster.text(builtins["bpchar"].oid)
-@TypeCaster.binary(builtins["bpchar"].oid)
-class NameCaster(TypeCaster):
+@Loader.text(builtins["name"].oid)
+@Loader.binary(builtins["name"].oid)
+@Loader.text(builtins["bpchar"].oid)
+@Loader.binary(builtins["bpchar"].oid)
+class NameLoader(Loader):
def __init__(self, oid: int, context: AdaptContext):
super().__init__(oid, context)
else:
self.decode = codecs.lookup("utf8").decode
- def cast(self, data: bytes) -> str:
+ def load(self, data: bytes) -> str:
return self.decode(data)[0]
-@Adapter.text(bytes)
-class BytesAdapter(Adapter):
+@Dumper.text(bytes)
+class BytesDumper(Dumper):
def __init__(self, src: type, context: AdaptContext = None):
super().__init__(src, context)
self.esc = Escaping(
self.connection.pgconn if self.connection is not None else None
)
- def adapt(self, obj: bytes) -> Tuple[bytes, int]:
+ def dump(self, obj: bytes) -> Tuple[bytes, int]:
return self.esc.escape_bytea(obj), BYTEA_OID
-@Adapter.binary(bytes)
-def adapt_bytes(b: bytes) -> Tuple[bytes, int]:
+@Dumper.binary(bytes)
+def dump_bytes(b: bytes) -> Tuple[bytes, int]:
return b, BYTEA_OID
-@TypeCaster.text(builtins["bytea"].oid)
-def cast_bytea(data: bytes) -> bytes:
+@Loader.text(builtins["bytea"].oid)
+def load_bytea(data: bytes) -> bytes:
return Escaping().unescape_bytea(data)
-@TypeCaster.binary(builtins["bytea"].oid)
-def cast_bytea_binary(data: bytes) -> bytes:
+@Loader.binary(builtins["bytea"].oid)
+def load_bytea_binary(data: bytes) -> bytes:
return data
import pytest
-from psycopg3.adapt import Transformer, Format, Adapter, TypeCaster
+from psycopg3.adapt import Transformer, Format, Dumper, Loader
from psycopg3.types.oids import builtins
TEXT_OID = builtins["text"].oid
("hello", Format.BINARY, b"hello", "text"),
],
)
-def test_adapt(data, format, result, type):
+def test_dump(data, format, result, type):
t = Transformer()
- rv = t.adapt(data, format)
+ rv = t.dump(data, format)
if isinstance(rv, tuple):
assert rv[0] == result
assert rv[1] == builtins[type].oid
assert rv == result
-def test_adapt_connection_ctx(conn):
- Adapter.register(str, lambda s: s.encode("ascii") + b"t", conn)
- Adapter.register_binary(str, lambda s: s.encode("ascii") + b"b", conn)
+def test_dump_connection_ctx(conn):
+ Dumper.register(str, lambda s: s.encode("ascii") + b"t", conn)
+ Dumper.register_binary(str, lambda s: s.encode("ascii") + b"b", conn)
cur = conn.cursor()
cur.execute("select %s, %b", ["hello", "world"])
assert cur.fetchone() == ("hellot", "worldb")
-def test_adapt_cursor_ctx(conn):
- Adapter.register(str, lambda s: s.encode("ascii") + b"t", conn)
- Adapter.register_binary(str, lambda s: s.encode("ascii") + b"b", conn)
+def test_dump_cursor_ctx(conn):
+ Dumper.register(str, lambda s: s.encode("ascii") + b"t", conn)
+ Dumper.register_binary(str, lambda s: s.encode("ascii") + b"b", conn)
cur = conn.cursor()
- Adapter.register(str, lambda s: s.encode("ascii") + b"tc", cur)
- Adapter.register_binary(str, lambda s: s.encode("ascii") + b"bc", cur)
+ Dumper.register(str, lambda s: s.encode("ascii") + b"tc", cur)
+ Dumper.register_binary(str, lambda s: s.encode("ascii") + b"bc", cur)
cur.execute("select %s, %b", ["hello", "world"])
assert cur.fetchone() == ("hellotc", "worldbc")
)
def test_cast(data, format, type, result):
t = Transformer()
- rv = t.cast(data, builtins[type].oid, format)
+ rv = t.load(data, builtins[type].oid, format)
assert rv == result
-def test_cast_connection_ctx(conn):
- TypeCaster.register(TEXT_OID, lambda b: b.decode("ascii") + "t", conn)
- TypeCaster.register_binary(
- TEXT_OID, lambda b: b.decode("ascii") + "b", conn
- )
+def test_load_connection_ctx(conn):
+ Loader.register(TEXT_OID, lambda b: b.decode("ascii") + "t", conn)
+ Loader.register_binary(TEXT_OID, lambda b: b.decode("ascii") + "b", conn)
r = conn.cursor().execute("select 'hello'::text").fetchone()
assert r == ("hellot",)
assert r == ("hellob",)
-def test_cast_cursor_ctx(conn):
- TypeCaster.register(TEXT_OID, lambda b: b.decode("ascii") + "t", conn)
- TypeCaster.register_binary(
- TEXT_OID, lambda b: b.decode("ascii") + "b", conn
- )
+def test_load_cursor_ctx(conn):
+ Loader.register(TEXT_OID, lambda b: b.decode("ascii") + "t", conn)
+ Loader.register_binary(TEXT_OID, lambda b: b.decode("ascii") + "b", conn)
cur = conn.cursor()
- TypeCaster.register(TEXT_OID, lambda b: b.decode("ascii") + "tc", cur)
- TypeCaster.register_binary(
- TEXT_OID, lambda b: b.decode("ascii") + "bc", cur
- )
+ Loader.register(TEXT_OID, lambda b: b.decode("ascii") + "tc", cur)
+ Loader.register_binary(TEXT_OID, lambda b: b.decode("ascii") + "bc", cur)
r = cur.execute("select 'hello'::text").fetchone()
assert r == ("hellotc",)
[("'{hello}'::text[]", ["helloc"]), ("row('hello'::text)", ("helloc",))],
)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_cursor_ctx_nested(conn, sql, obj, fmt_out):
+def test_load_cursor_ctx_nested(conn, sql, obj, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
- TypeCaster.register(
+ Loader.register(
TEXT_OID, lambda b: b.decode("ascii") + "c", cur, format=fmt_out
)
cur.execute(f"select {sql}")
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("obj, want", tests_str)
-def test_adapt_list_str(conn, obj, want, fmt_in):
+def test_dump_list_str(conn, obj, want, fmt_in):
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
cur.execute(f"select {ph}::text[] = %s::text[]", (obj, want))
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("want, obj", tests_str)
-def test_cast_list_str(conn, obj, want, fmt_out):
+def test_load_list_str(conn, obj, want, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
cur.execute("select %s::text[]", (obj,))
assert cur.fetchone()[0] == want
@pytest.mark.parametrize("obj, want", tests_int)
-def test_adapt_list_int(conn, obj, want):
+def test_dump_list_int(conn, obj, want):
cur = conn.cursor()
cur.execute("select %s::int[] = %s::int[]", (obj, want))
assert cur.fetchone()[0]
def test_bad_binary_array(input):
tx = Transformer()
with pytest.raises(psycopg3.DataError):
- tx.adapt(input, Format.BINARY)
+ tx.dump(input, Format.BINARY)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("want, obj", tests_int)
-def test_cast_list_int(conn, obj, want, fmt_out):
+def test_load_list_int(conn, obj, want, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
cur.execute("select %s::int[]", (obj,))
assert cur.fetchone()[0] == want
def test_array_mixed_numbers():
# TODO: must use the type accommodating the largest/highest precision
tx = Transformer()
- ad = tx.adapt([1, 32767], Format.BINARY)
+ ad = tx.dump([1, 32767], Format.BINARY)
assert ad[1] == builtins["int2"].array_oid
- ad = tx.adapt([1, 32768], Format.BINARY)
+ ad = tx.dump([1, 32768], Format.BINARY)
assert ad[1] == builtins["int4"].array_oid
import pytest
-from psycopg3.adapt import Format, TypeCaster
+from psycopg3.adapt import Format, Loader
from psycopg3.types import builtins, composite
@pytest.mark.parametrize("rec, want", tests_str)
-def test_cast_record(conn, want, rec):
+def test_load_record(conn, want, rec):
cur = conn.cursor()
res = cur.execute(f"select row({rec})").fetchone()[0]
assert res == want
@pytest.mark.parametrize("rec, obj", tests_str)
-def test_adapt_tuple(conn, rec, obj):
+def test_dump_tuple(conn, rec, obj):
cur = conn.cursor()
fields = [f"f{i} text" for i in range(len(obj))]
cur.execute(
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_all_chars(conn, fmt_out):
+def test_load_all_chars(conn, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
for i in range(1, 256):
res = cur.execute("select row(chr(%s::int))", (i,)).fetchone()[0]
),
],
)
-def test_cast_record_binary(conn, want, rec):
+def test_load_record_binary(conn, want, rec):
cur = conn.cursor(binary=True)
res = cur.execute(f"select row({rec})").fetchone()[0]
assert res == want
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_composite(conn, testcomp, fmt_out):
+def test_load_composite(conn, testcomp, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
info = composite.fetch_info(conn, "testcomp")
composite.register(info, conn)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_composite_factory(conn, testcomp, fmt_out):
+def test_load_composite_factory(conn, testcomp, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
info = composite.fetch_info(conn, "testcomp")
composite.register(info)
for fmt in (Format.TEXT, Format.BINARY):
for oid in (info.oid, info.array_oid):
- assert TypeCaster.globals.pop((oid, fmt))
+ assert Loader.globals.pop((oid, fmt))
cur = conn.cursor()
composite.register(info, cur)
for fmt in (Format.TEXT, Format.BINARY):
for oid in (info.oid, info.array_oid):
key = oid, fmt
- assert key not in TypeCaster.globals
- assert key not in conn.casters
- assert key in cur.casters
+ assert key not in Loader.globals
+ assert key not in conn.loaders
+ assert key in cur.loaders
composite.register(info, conn)
for fmt in (Format.TEXT, Format.BINARY):
for oid in (info.oid, info.array_oid):
key = oid, fmt
- assert key not in TypeCaster.globals
- assert key in conn.casters
+ assert key not in Loader.globals
+ assert key in conn.loaders
import pytest
-from psycopg3.adapt import TypeCaster, Transformer, Format
+from psycopg3.adapt import Loader, Transformer, Format
from psycopg3.types import builtins
-from psycopg3.types.numeric import cast_float
+from psycopg3.types.numeric import load_float
#
(int(-(2 ** 63)), "'-9223372036854775808'::bigint"),
],
)
-def test_adapt_int(conn, val, expr):
+def test_dump_int(conn, val, expr):
assert isinstance(val, int)
cur = conn.cursor()
cur.execute("select %s = %%s" % expr, (val,))
@pytest.mark.xfail
-def test_adapt_int_binary():
+def test_dump_int_binary():
# TODO: int binary adaptation (must choose the fitting int2,4,8)
tx = Transformer()
- tx.adapt(1, Format.BINARY)
+ tx.dump(1, Format.BINARY)
@pytest.mark.parametrize(
],
)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_int(conn, val, pgtype, want, fmt_out):
+def test_load_int(conn, val, pgtype, want, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
cur.execute(f"select %s::{pgtype}", (val,))
assert cur.pgresult.fformat(0) == fmt_out
(float("-inf"), "'-inf'"),
],
)
-def test_adapt_float(conn, val, expr):
+def test_dump_float(conn, val, expr):
assert isinstance(val, float)
cur = conn.cursor()
cur.execute("select %%s = %s::float8" % expr, (val,))
(-1e-30, "-1e-30"),
],
)
-def test_adapt_float_approx(conn, val, expr):
+def test_dump_float_approx(conn, val, expr):
assert isinstance(val, float)
cur = conn.cursor()
cur.execute(
@pytest.mark.xfail
-def test_adapt_float_binary():
+def test_dump_float_binary():
# TODO: float binary adaptation
tx = Transformer()
- tx.adapt(1.0, Format.BINARY)
+ tx.dump(1.0, Format.BINARY)
@pytest.mark.parametrize(
],
)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_float(conn, val, pgtype, want, fmt_out):
+def test_load_float(conn, val, pgtype, want, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
cur.execute(f"select %s::{pgtype}", (val,))
assert cur.pgresult.fformat(0) == fmt_out
],
)
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_float_approx(conn, expr, pgtype, want, fmt_out):
+def test_load_float_approx(conn, expr, pgtype, want, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
cur.execute("select %s::%s" % (expr, pgtype))
assert cur.pgresult.fformat(0) == fmt_out
@pytest.mark.xfail
-def test_adapt_numeric_binary():
+def test_dump_numeric_binary():
# TODO: numeric binary adaptation
tx = Transformer()
- tx.adapt(Decimal(1), Format.BINARY)
+ tx.dump(Decimal(1), Format.BINARY)
@pytest.mark.xfail
-def test_cast_numeric_binary(conn):
+def test_load_numeric_binary(conn):
# TODO: numeric binary casting
cur = conn.cursor(binary=True)
res = cur.execute("select 1::numeric").fetchone()[0]
)
def test_numeric_as_float(conn, val):
cur = conn.cursor()
- TypeCaster.register(builtins["numeric"].oid, cast_float, cur)
+ Loader.register(builtins["numeric"].oid, load_float, cur)
val = Decimal(val)
cur.execute("select %s", (val,))
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-def test_adapt_1char(conn, fmt_in):
+def test_dump_1char(conn, fmt_in):
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
for i in range(1, 256):
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_1char(conn, typename, fmt_out):
+def test_load_1char(conn, typename, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
for i in range(1, 256):
cur.execute(f"select chr(%s::int)::{typename}", (i,))
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
-def test_adapt_enc(conn, fmt_in, encoding):
+def test_dump_enc(conn, fmt_in, encoding):
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-def test_adapt_ascii(conn, fmt_in):
+def test_dump_ascii(conn, fmt_in):
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-def test_adapt_badenc(conn, fmt_in):
+def test_dump_badenc(conn, fmt_in):
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("encoding", ["utf8", "latin9"])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
-def test_cast_enc(conn, typename, encoding, fmt_out):
+def test_load_enc(conn, typename, encoding, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
conn.encoding = encoding
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("typename", ["text", "varchar", "name", "bpchar"])
-def test_cast_badenc(conn, typename, fmt_out):
+def test_load_badenc(conn, typename, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
conn.encoding = "latin1"
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("typename", ["text", "varchar"])
-def test_cast_ascii(conn, typename, fmt_out):
+def test_load_ascii(conn, typename, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
conn.encoding = "sql_ascii"
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("typename", ["name", "bpchar"])
-def test_cast_ascii_encanyway(conn, typename, fmt_out):
+def test_load_ascii_encanyway(conn, typename, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
conn.encoding = "sql_ascii"
@pytest.mark.parametrize("fmt_in", [Format.TEXT, Format.BINARY])
-def test_adapt_1byte(conn, fmt_in):
+def test_dump_1byte(conn, fmt_in):
cur = conn.cursor()
ph = "%s" if fmt_in == Format.TEXT else "%b"
for i in range(0, 256):
@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
-def test_cast_1byte(conn, fmt_out):
+def test_load_1byte(conn, fmt_out):
cur = conn.cursor(binary=fmt_out == Format.BINARY)
for i in range(0, 256):
cur.execute("select %s::bytea", (fr"\x{i:02x}",))