for row in copy.rows():
print(row) # (10, datetime.date(2046, 12, 24))
-.. admonition:: TODO
-
- Currently only builtin names are recognised; custom types must be
- specified by numeric oid. This wll change after the `TypeRegistry` and
- `AdaptContext` get integrated, none of which I have documented, so you
- haven't seen anything... 👀
-
Copying block-by-block
----------------------
from operator import attrgetter
from . import errors as e
-from .oids import builtins
if TYPE_CHECKING:
from .cursor import BaseCursor
fmod=res.fmod(index),
fsize=res.fsize(index),
)
+ self._type = cursor.adapters.types.get(self._data.ftype)
_attrs = tuple(
attrgetter(attr)
def _type_display(self) -> str:
parts = []
- t = builtins.get(self.type_code)
- parts.append(t.name if t else str(self.type_code))
+ parts.append(self._type.name if self._type else str(self.type_code))
mod1 = self.precision
if mod1 is None:
parts.append(f", {self.scale}")
parts.append(")")
- if t and self.type_code == t.array_oid:
+ if self._type and self.type_code == self._type.array_oid:
parts.append("[]")
return "".join(parts)
@property
def display_size(self) -> Optional[int]:
"""The field size, for :sql:`varchar(n)`, None otherwise."""
- t = builtins.get(self.type_code)
- if not t:
+ if not self._type:
return None
- if t.name in ("varchar", "char"):
+ if self._type.name in ("varchar", "char"):
fmod = self._data.fmod
if fmod >= 0:
return fmod - 4
@property
def precision(self) -> Optional[int]:
"""The number of digits for fixed precision types."""
- t = builtins.get(self.type_code)
- if not t:
+ if not self._type:
return None
dttypes = ("time", "timetz", "timestamp", "timestamptz", "interval")
- if t.name == "numeric":
+ if self._type.name == "numeric":
fmod = self._data.fmod
if fmod >= 0:
return fmod >> 16
- elif t.name in dttypes:
+ elif self._type.name in dttypes:
fmod = self._data.fmod
if fmod >= 0:
return fmod & 0xFFFF
TODO: probably better than precision for datetime objects? review.
"""
- if self.type_code == builtins["numeric"].oid:
+ if self._type and self._type.name == "numeric":
fmod = self._data.fmod - 4
if fmod >= 0:
return fmod & 0xFFFF
from . import proto
from . import errors as e
from ._enums import Format as Format
-from .oids import builtins
+from .oids import TypesRegistry, postgres_types
from .proto import AdaptContext, Buffer as Buffer
if TYPE_CHECKING:
"""
Configure *context* to use this loader to convert values with OID *oid*.
"""
- if isinstance(oid, str):
- oid = builtins[oid].oid
adapters = context.adapters if context else global_adapters
adapters.register_loader(oid, cls)
_dumpers: List[Dict[Union[type, str], Type["Dumper"]]]
_loaders: List[Dict[int, Type["Loader"]]]
+ types: TypesRegistry
# Record if a dumper or loader has an optimised version.
_optimised: Dict[type, type] = {}
- def __init__(self, extend: Optional["AdaptersMap"] = None):
- if extend:
- self._dumpers = extend._dumpers[:]
+ def __init__(
+ self,
+ template: Optional["AdaptersMap"] = None,
+ types: Optional[TypesRegistry] = None,
+ ):
+ if template:
+ self._dumpers = template._dumpers[:]
self._own_dumpers = [False, False]
- self._loaders = extend._loaders[:]
+ self._loaders = template._loaders[:]
self._own_loaders = [False, False]
+ self.types = TypesRegistry(template.types)
else:
self._dumpers = [{}, {}]
self._own_dumpers = [True, True]
self._loaders = [{}, {}]
self._own_loaders = [True, True]
+ self.types = types or TypesRegistry()
# implement the AdaptContext protocol too
@property
self._dumpers[fmt][cls] = dumper
- def register_loader(self, oid: int, loader: Type[Loader]) -> None:
+ def register_loader(
+ self, oid: Union[int, str], loader: Type[Loader]
+ ) -> None:
"""
Configure the context to use *loader* to convert data of oid *oid*.
"""
+ if isinstance(oid, str):
+ oid = self.types[oid].oid
if not isinstance(oid, int):
raise TypeError(
f"loaders should be registered on oid, got {oid} instead"
return cls
-global_adapters = AdaptersMap()
+global_adapters = AdaptersMap(types=postgres_types)
Transformer: Type[proto.Transformer]
from . import pq
from . import errors as e
from .pq import ExecStatus
-from .oids import builtins
from .adapt import Format
from .proto import ConnectionType, PQGen, Transformer
from .generators import copy_from, copy_to, copy_end
custom data types you must use their oid.
"""
- # TODO: should allow names of non-builtin types
- # Must put a types map on the context.
+ registry = self.cursor.adapters.types
oids = [
- t if isinstance(t, int) else builtins.get_oid(t) for t in types
+ t if isinstance(t, int) else registry.get_oid(t) for t in types
]
self.formatter.transformer.set_row_types(
oids, [self.formatter.format] * len(types)
from typing import Any, Sequence
from .pq import Format
-from .oids import builtins
+from .oids import postgres_types as builtins
from .adapt import Dumper
Container for the information about types in a database.
"""
- def __init__(self) -> None:
- self._by_oid: Dict[int, TypeInfo] = {}
- self._by_name: Dict[str, TypeInfo] = {}
- self._by_range_subtype: Dict[int, TypeInfo] = {}
+ def __init__(self, template: Optional["TypesRegistry"] = None):
+ self._by_oid: Dict[int, TypeInfo]
+ self._by_name: Dict[str, TypeInfo]
+ self._by_range_subtype: Dict[int, TypeInfo]
+
+ if template:
+ self._by_oid = template._by_oid
+ self._by_name = template._by_name
+ self._by_range_subtype = template._by_range_subtype
+ self._own_state = False
+ else:
+ self._by_oid = {}
+ self._by_name = {}
+ self._by_range_subtype = {}
+ self._own_state = True
def add(self, info: TypeInfo) -> None:
+ self._ensure_own_state()
self._by_oid[info.oid] = info
if info.array_oid:
self._by_oid[info.array_oid] = info
return None
return self._by_range_subtype.get(info.oid)
+ def _ensure_own_state(self) -> None:
+ # Time to write! so, copy.
+ if not self._own_state:
+ self._by_oid = self._by_oid.copy()
+ self._by_name = self._by_name.copy()
+ self._by_range_subtype = self._by_range_subtype.copy()
+ self._own_state = True
+
-builtins = TypesRegistry()
+postgres_types = TypesRegistry()
# Use tools/update_oids.py to update this data.
for r in [
("xml", 142, 143, 0, "xml", ","),
# autogenerated: end
]:
- builtins.add(BuiltinTypeInfo(*r))
+ postgres_types.add(BuiltinTypeInfo(*r))
# A few oids used a bit everywhere
INVALID_OID = 0
-TEXT_OID = builtins["text"].oid
-TEXT_ARRAY_OID = builtins["text"].array_oid
+TEXT_OID = postgres_types["text"].oid
+TEXT_ARRAY_OID = postgres_types["text"].array_oid
# Copyright (C) 2020-2021 The Psycopg Team
-from ..oids import builtins, INVALID_OID
+from ..oids import INVALID_OID
from ..proto import AdaptContext
# Register default adapters
RecordLoader.register("record", ctx)
RecordBinaryLoader.register("record", ctx)
- array.register_all_arrays()
+ array.register_all_arrays(ctx)
from .. import pq
from .. import errors as e
-from ..oids import builtins, TEXT_OID, TEXT_ARRAY_OID, INVALID_OID
+from ..oids import postgres_types, TEXT_OID, TEXT_ARRAY_OID, INVALID_OID
from ..adapt import Buffer, Dumper, Loader, Transformer
from ..adapt import Format as Pg3Format
from ..proto import AdaptContext
super().__init__(cls, context)
self._tx = Transformer(context)
self.sub_dumper: Optional[Dumper] = None
+ self._types = context.adapters.types if context else postgres_types
def get_key(self, obj: List[Any], format: Pg3Format) -> Tuple[type, ...]:
item = self._find_list_element(obj)
Return the oid of the array from the oid of the base item.
Fall back on text[].
- TODO: we shouldn't consider builtins only, but other adaptation
- contexts too
"""
oid = 0
if base_oid:
- info = builtins.get(base_oid)
+ info = self._types.get(base_oid)
if info:
oid = info.array_oid
loader.register(array_oid, context=context)
-def register_all_arrays() -> None:
+def register_all_arrays(ctx: AdaptContext) -> None:
"""
Associate the array oid of all the types in Loader.globals.
This function is designed to be called once at import time, after having
registered all the base loaders.
"""
- for t in builtins:
+ for t in ctx.adapters.types:
# TODO: handle different delimiters (box)
if t.array_oid and getattr(t, "delimiter", None) == ",":
register(t.array_oid, t.oid, name=t.name)
class TupleDumper(SequenceDumper):
# Should be this, but it doesn't work
- # _oid = builtins["record"].oid
+ # _oid = postgres_types["record"].oid
def dump(self, obj: Tuple[Any, ...]) -> bytes:
return self._dump_sequence(obj, b"(", b")", b",")
from typing import cast, Optional, Tuple, Union
from ..pq import Format
-from ..oids import builtins
+from ..oids import postgres_types as builtins
from ..adapt import Buffer, Dumper, Loader, Format as Pg3Format
from ..proto import AdaptContext
from ..errors import InterfaceError, DataError
from typing import Any, Callable, Optional
from ..pq import Format
-from ..oids import builtins
+from ..oids import postgres_types as builtins
from ..adapt import Buffer, Dumper, Loader
from ..errors import DataError
from typing import Callable, Optional, Union, TYPE_CHECKING
from ..pq import Format
-from ..oids import builtins
+from ..oids import postgres_types as builtins
from ..adapt import Buffer, Dumper, Loader
from ..proto import AdaptContext
from decimal import Decimal
from ..pq import Format
-from ..oids import builtins
+from ..oids import postgres_types as builtins
from ..adapt import Buffer, Dumper, Loader
from ..adapt import Format as Pg3Format
from ..wrappers.numeric import Int2, Int4, Int8, IntNumeric
from .. import sql
from .. import errors as e
from ..pq import Format
-from ..oids import builtins, TypeInfo, INVALID_OID
+from ..oids import postgres_types as builtins, TypeInfo, INVALID_OID
from ..adapt import Buffer, Dumper, Loader, Format as Pg3Format
from ..proto import AdaptContext
def __init__(self, cls: type, context: Optional[AdaptContext] = None):
super().__init__(cls, context)
self.sub_dumper: Optional[Dumper] = None
+ self._types = context.adapters.types if context else builtins
def dump(self, obj: Range[Any]) -> bytes:
if not obj:
TODO: we shouldn't consider builtins only, but other adaptation
contexts too
"""
- info = builtins.get_range(sub_oid)
+ info = self._types.get_range(sub_oid)
return info.oid if info else INVALID_OID
# Copyright (C) 2020-2021 The Psycopg Team
from ..pq import Format
-from ..oids import builtins
+from ..oids import postgres_types as builtins
from ..adapt import Buffer, Dumper, Loader
from typing import Optional, Union, TYPE_CHECKING
from ..pq import Format, Escaping
-from ..oids import builtins
+from ..oids import postgres_types as builtins
from ..adapt import Buffer, Dumper, Loader
from ..proto import AdaptContext
from ..errors import DataError
from typing import Callable, Optional, TYPE_CHECKING
from ..pq import Format
-from ..oids import builtins
+from ..oids import postgres_types as builtins
from ..adapt import Buffer, Dumper, Loader
from ..proto import AdaptContext
context: Optional["AdaptContext"] = None,
int format = PQ_TEXT,
) -> None:
- if isinstance(oid, str):
- from psycopg3.oids import builtins
- oid = builtins[oid].oid
-
if context is not None:
adapters = context.adapters
else:
import psycopg3
from psycopg3 import sql
-from psycopg3.oids import builtins
from psycopg3.adapt import Format
record = self.make_record(nulls=0)
tx = psycopg3.adapt.Transformer(self.conn)
types = []
+ registry = self.conn.adapters.types
for value in record:
dumper = tx.get_dumper(value, self.format)
dumper.dump(value) # load the oid if it's dynamic (e.g. array)
- info = builtins.get(dumper.oid) or builtins.get("text")
+ info = registry.get(dumper.oid) or registry.get("text")
if dumper.oid == info.array_oid:
types.append(sql.SQL("{}[]").format(sql.Identifier(info.name)))
else:
import psycopg3
from psycopg3 import pq
from psycopg3.adapt import Transformer, Format, Dumper, Loader
-from psycopg3.oids import builtins, TEXT_OID
+from psycopg3.oids import postgres_types as builtins, TEXT_OID
@pytest.mark.parametrize(
from psycopg3 import sql
from psycopg3 import errors as e
from psycopg3.pq import Format
-from psycopg3.oids import builtins
from psycopg3.adapt import Format as PgFormat
from psycopg3.types.numeric import Int4
select 10::int4, 'hello'::text, '{{0.0,1.0}}'::float8[]
) to stdout (format {format.name})"""
) as copy:
- types = ["int4", "text", "float8[]"]
- if typetype == "oids":
- types = [builtins.get_oid(t) for t in types]
- copy.set_types(types)
+ copy.set_types(["int4", "text", "float8[]"])
row = copy.read_row()
assert copy.read_row() is None
with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
- copy.set_types(
- [builtins["int4"].oid, builtins["int4"].oid, builtins["text"].oid]
- )
+ copy.set_types(["int4", "int4", "text"])
rows = list(copy.rows())
assert rows == sample_records
"copy (select unnest({}::text[])) to stdout (format {})"
).format(chars, sql.SQL(format.name))
with cur.copy(query) as copy:
- copy.set_types([builtins["text"].oid])
+ copy.set_types(["text"])
while 1:
row = copy.read_row()
if not row:
from psycopg3 import sql
from psycopg3 import errors as e
from psycopg3.pq import Format
-from psycopg3.oids import builtins
from psycopg3.adapt import Format as PgFormat
from .test_copy import sample_text, sample_binary, sample_binary_rows # noqa
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
- # TODO: should be passed by name
- # big refactoring to be had, to have builtins not global and merged
- # to adaptation context I guess...
- copy.set_types(
- [builtins["int4"].oid, builtins["int4"].oid, builtins["text"].oid]
- )
+ copy.set_types("int4 int4 text".split())
rows = []
while 1:
row = await copy.read_row()
async with cur.copy(
f"copy ({sample_values}) to stdout (format {format.name})"
) as copy:
- copy.set_types(
- [builtins["int4"].oid, builtins["int4"].oid, builtins["text"].oid]
- )
+ copy.set_types("int4 int4 text".split())
rows = []
async for row in copy.rows():
rows.append(row)
"copy (select unnest({}::text[])) to stdout (format {})"
).format(chars, sql.SQL(format.name))
async with cur.copy(query) as copy:
- copy.set_types([builtins["text"].oid])
+ copy.set_types(["text"])
while 1:
row = await copy.read_row()
if not row:
import psycopg3
from psycopg3 import sql
-from psycopg3.oids import builtins
+from psycopg3.oids import postgres_types as builtins
from psycopg3.adapt import Format
import psycopg3
from psycopg3 import pq
from psycopg3 import sql
-from psycopg3.oids import builtins
+from psycopg3.oids import postgres_types as builtins
from psycopg3.adapt import Format, Transformer
from psycopg3.types import array
from psycopg3 import pq
from psycopg3.sql import Identifier
-from psycopg3.oids import builtins
+from psycopg3.oids import postgres_types as builtins
from psycopg3.adapt import Format, global_adapters
from psycopg3.types.composite import CompositeInfo
from psycopg3 import pq
from psycopg3 import sql
-from psycopg3.oids import builtins
from psycopg3.adapt import Transformer, Format
from psycopg3.types.numeric import FloatLoader
cur = conn.cursor(binary=fmt_out)
cur.execute(f"select %s::{pgtype}", (val,))
assert cur.pgresult.fformat(0) == fmt_out
- assert cur.pgresult.ftype(0) == builtins[pgtype].oid
+ assert cur.pgresult.ftype(0) == conn.adapters.types[pgtype].oid
result = cur.fetchone()[0]
assert result == want
assert type(result) is type(want)
# arrays work too
cur.execute(f"select array[%s::{pgtype}]", (val,))
assert cur.pgresult.fformat(0) == fmt_out
- assert cur.pgresult.ftype(0) == builtins[pgtype].array_oid
+ assert cur.pgresult.ftype(0) == conn.adapters.types[pgtype].array_oid
result = cur.fetchone()[0]
assert result == [want]
assert type(result[0]) is type(want)
cur = conn.cursor(binary=fmt_out)
cur.execute(f"select %s::{pgtype}", (val,))
assert cur.pgresult.fformat(0) == fmt_out
- assert cur.pgresult.ftype(0) == builtins[pgtype].oid
+ assert cur.pgresult.ftype(0) == conn.adapters.types[pgtype].oid
result = cur.fetchone()[0]
def check(result, want):
cur.execute(f"select array[%s::{pgtype}]", (val,))
assert cur.pgresult.fformat(0) == fmt_out
- assert cur.pgresult.ftype(0) == builtins[pgtype].array_oid
+ assert cur.pgresult.ftype(0) == conn.adapters.types[pgtype].array_oid
result = cur.fetchone()[0]
assert isinstance(result, list)
check(result[0], want)
)
def test_numeric_as_float(conn, val):
cur = conn.cursor()
- FloatLoader.register(builtins["numeric"].oid, cur)
+ FloatLoader.register(conn.adapters.types["numeric"].oid, cur)
val = Decimal(val)
cur.execute("select %s as val", (val,))
import pytest
from psycopg3.sql import Identifier
-from psycopg3.oids import builtins
from psycopg3.types import range as mrange
from psycopg3.types.range import Range
assert info.name == "testrange"
assert info.oid > 0
assert info.oid != info.array_oid > 0
- assert info.range_subtype == builtins[subtype].oid
+ assert info.range_subtype == conn.adapters.types[subtype].oid
def test_fetch_info_not_found(conn):
assert info.name == "testrange"
assert info.oid > 0
assert info.oid != info.array_oid > 0
- assert info.range_subtype == builtins[subtype].oid
+ assert info.range_subtype == aconn.adapters.types[subtype].oid
@pytest.mark.asyncio
from psycopg3 import pq
from psycopg3 import sql
-from psycopg3.oids import builtins
+from psycopg3.oids import postgres_types as builtins
from psycopg3.adapt import Transformer, Format