BinaryDumper.register(Binary, global_adapters) # dbapi20
-# Override adapters with fast version if available
-if pq.__impl__ == "c":
- from psycopg3_c import _psycopg3
-
- _psycopg3.register_builtin_c_adapters()
-
-
# Note: defining the exported methods helps both Sphynx in documenting that
# this is the canonical place to obtain them and should be used by MyPy too,
# so that function signatures are consistent with the documentation.
# Copyright (C) 2020 The Psycopg Team
from abc import ABC, abstractmethod
-from typing import Any, Dict, List, Optional, Type, Union
-from typing import TYPE_CHECKING
+from typing import Any, Dict, List, Optional, Type, TypeVar, Union
+from typing import cast, TYPE_CHECKING
from . import pq
from . import proto
from .pq import Format as Format
if TYPE_CHECKING:
from .connection import BaseConnection
+RV = TypeVar("RV")
+
class Dumper(ABC):
"""
"""
format: Format
- connection: Optional["BaseConnection"] = None
# A class-wide oid, which will be used by default by instances unless
# the subclass overrides it in init.
def __init__(self, cls: type, context: Optional[AdaptContext] = None):
self.cls = cls
- self.connection = context.connection if context else None
+ self.connection: Optional["BaseConnection"] = (
+ context.connection if context else None
+ )
self.oid = self._oid
"""The oid to pass to the server, if known."""
"""
format: Format
- connection: Optional["BaseConnection"]
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
self.oid = oid
- self.connection = context.connection if context else None
+ self.connection: Optional["BaseConnection"] = (
+ context.connection if context else None
+ )
@abstractmethod
def load(self, data: bytes) -> Any:
_dumpers: List[Dict[Union[type, str], Type["Dumper"]]]
_loaders: List[Dict[int, Type["Loader"]]]
+ # Record if a dumper or loader has an optimised version.
+ _optimised: Dict[type, type] = {}
+
def __init__(self, extend: Optional["AdaptersMap"] = None):
if extend:
self._dumpers = extend._dumpers[:]
f"dumpers should be registered on classes, got {cls} instead"
)
+ dumper = self._get_optimised(dumper)
fmt = dumper.format
if not self._own_dumpers[fmt]:
self._dumpers[fmt] = self._dumpers[fmt].copy()
f"loaders should be registered on oid, got {oid} instead"
)
+ loader = self._get_optimised(loader)
fmt = loader.format
if not self._own_loaders[fmt]:
self._loaders[fmt] = self._loaders[fmt].copy()
"""
return self._loaders[format].get(oid)
+ @classmethod
+ def _get_optimised(self, cls: Type[RV]) -> Type[RV]:
+ """Return the optimised version of a Dumper or Loader class.
+
+ Return the input class itself if there is no optimised version.
+ """
+ try:
+ return self._optimised[cls]
+ except KeyError:
+ pass
+
+ # Check if the class comes from psycopg3.types and there is a class
+ # with the same name in psycopg3_c._psycopg3.
+ if pq.__impl__ == "c":
+ from psycopg3 import types
+ from psycopg3_c import _psycopg3
+
+ if cls.__module__.startswith(types.__name__):
+ new = cast(Type[RV], getattr(_psycopg3, cls.__name__, None))
+ if new:
+ self._optimised[cls] = new
+ return new
+
+ self._optimised[cls] = cls
+ return cls
+
global_adapters = AdaptersMap()
) -> Tuple[Any, ...]: ...
def get_loader(self, oid: int, format: Format) -> Loader: ...
-def register_builtin_c_adapters() -> None: ...
def connect(conninfo: str) -> proto.PQGenConn[PGconn]: ...
def execute(pgconn: PGconn) -> proto.PQGen[List[PGresult]]: ...
def format_row_binary(row: Sequence[Any], tx: proto.Transformer) -> bytes: ...
from typing import Any
+cimport cython
from cpython.bytes cimport PyBytes_AsStringAndSize
from cpython.bytearray cimport PyByteArray_FromStringAndSize, PyByteArray_Resize
from cpython.bytearray cimport PyByteArray_GET_SIZE, PyByteArray_AS_STRING
logger = logging.getLogger("psycopg3.adapt")
+@cython.freelist(8)
cdef class CDumper:
cdef object cls
cdef public libpq.Oid oid
return PyByteArray_AS_STRING(ba) + offset
+@cython.freelist(8)
cdef class CLoader:
cdef public libpq.Oid oid
- cdef public connection
+ cdef readonly connection
def __init__(self, int oid, context: Optional[AdaptContext] = None):
self.oid = oid
from psycopg3.adapt import global_adapters as adapters
adapters.register_loader(oid, cls)
-
-
-def register_builtin_c_adapters():
- """
- Register all the builtin optimized adpaters.
-
- This function is supposed to be called only once, after the Python adapters
- are registered.
-
- """
- logger.debug("registering optimised c adapters")
- register_numeric_c_adapters()
- register_singletons_c_adapters()
- register_text_c_adapters()
# Copyright (C) 2020 The Psycopg Team
+cimport cython
+
from libc.stdint cimport *
from libc.string cimport memcpy, strlen
from cpython.mem cimport PyMem_Free
int Py_DTSF_ADD_DOT_0
+# @cython.final # TODO? causes compile warnings
cdef class IntDumper(CDumper):
format = Format.TEXT
return rv
+@cython.final
cdef class Int4BinaryDumper(CDumper):
format = Format.BINARY
return sizeof(int32_t)
+@cython.final
cdef class Int8BinaryDumper(CDumper):
format = Format.BINARY
return sizeof(int64_t)
+@cython.final
cdef class IntLoader(CLoader):
format = Format.TEXT
return PyLong_FromString(data, NULL, 10)
+@cython.final
cdef class Int2BinaryLoader(CLoader):
format = Format.BINARY
return PyLong_FromLong(<int16_t>be16toh((<uint16_t *>data)[0]))
+@cython.final
cdef class Int4BinaryLoader(CLoader):
format = Format.BINARY
return PyLong_FromLong(<int32_t>be32toh((<uint32_t *>data)[0]))
+@cython.final
cdef class Int8BinaryLoader(CLoader):
format = Format.BINARY
return PyLong_FromLongLong(<int64_t>be64toh((<uint64_t *>data)[0]))
+@cython.final
cdef class OidBinaryLoader(CLoader):
format = Format.BINARY
return PyLong_FromUnsignedLong(be32toh((<uint32_t *>data)[0]))
+@cython.final
cdef class FloatDumper(CDumper):
format = Format.TEXT
}
+@cython.final
cdef class FloatBinaryDumper(CDumper):
format = Format.BINARY
return sizeof(swp)
+@cython.final
cdef class FloatLoader(CLoader):
format = Format.TEXT
return PyFloat_FromDouble(d)
+@cython.final
cdef class Float4BinaryLoader(CLoader):
format = Format.BINARY
return PyFloat_FromDouble((<float *>swp)[0])
+@cython.final
cdef class Float8BinaryLoader(CLoader):
format = Format.BINARY
cdef uint64_t asint = be64toh((<uint64_t *>data)[0])
cdef char *swp = <char *>&asint
return PyFloat_FromDouble((<double *>swp)[0])
-
-
-cdef void register_numeric_c_adapters():
- logger.debug("registering optimised numeric c adapters")
-
- IntDumper.register(int)
- Int8BinaryDumper.register(int)
-
- IntLoader.register(oids.INT2_OID)
- IntLoader.register(oids.INT4_OID)
- IntLoader.register(oids.INT8_OID)
- IntLoader.register(oids.OID_OID)
- Int2BinaryLoader.register(oids.INT2_OID)
- Int4BinaryLoader.register(oids.INT4_OID)
- Int8BinaryLoader.register(oids.INT8_OID)
- OidBinaryLoader.register(oids.OID_OID)
-
- FloatDumper.register(float)
- FloatBinaryDumper.register(float)
-
- FloatLoader.register(oids.FLOAT4_OID)
- FloatLoader.register(oids.FLOAT8_OID)
- Float4BinaryLoader.register(oids.FLOAT4_OID)
- Float8BinaryLoader.register(oids.FLOAT8_OID)
# Copyright (C) 2020 The Psycopg Team
+cimport cython
+
from psycopg3.pq import Format
+@cython.final
cdef class BoolDumper(CDumper):
format = Format.TEXT
return b"true" if obj else b"false"
-cdef class BoolBinaryDumper(BoolDumper):
+@cython.final
+cdef class BoolBinaryDumper(CDumper):
format = Format.BINARY
+ def __cinit__(self):
+ self.oid = oids.BOOL_OID
+
cdef Py_ssize_t cdump(self, obj, bytearray rv, Py_ssize_t offset) except -1:
CDumper.ensure_size(rv, offset, 1)
return 1
+@cython.final
cdef class BoolLoader(CLoader):
format = Format.TEXT
return True if data[0] == b't' else False
+@cython.final
cdef class BoolBinaryLoader(CLoader):
format = Format.BINARY
cdef object cload(self, const char *data, size_t length):
return True if data[0] else False
-
-
-cdef void register_singletons_c_adapters():
- logger.debug("registering optimised singletons c adapters")
-
- BoolDumper.register(bool)
- BoolBinaryDumper.register(bool)
-
- BoolLoader.register(oids.BOOL_OID)
- BoolBinaryLoader.register(oids.BOOL_OID)
# Copyright (C) 2020 The Psycopg Team
+cimport cython
+
from libc.string cimport memcpy, memchr
from cpython.bytes cimport PyBytes_AsString, PyBytes_AsStringAndSize
from cpython.unicode cimport (
):
self.is_utf8 = 1
-
-cdef class StringBinaryDumper(_StringDumper):
-
- format = Format.BINARY
-
cdef Py_ssize_t cdump(self, obj, bytearray rv, Py_ssize_t offset) except -1:
# the server will raise DataError subclass if the string contains 0x00
cdef Py_ssize_t size;
return size
-cdef class StringDumper(StringBinaryDumper):
+@cython.final
+cdef class StringBinaryDumper(_StringDumper):
+
+ format = Format.BINARY
+
+
+@cython.final
+cdef class StringDumper(_StringDumper):
format = Format.TEXT
return size
-cdef class TextLoader(CLoader):
+cdef class _TextLoader(CLoader):
format = Format.TEXT
else:
return data[:length]
+@cython.final
+cdef class TextLoader(_TextLoader):
+
+ format = Format.TEXT
+
+
+@cython.final
+cdef class TextBinaryLoader(_TextLoader):
-cdef class TextBinaryLoader(TextLoader):
format = Format.BINARY
+@cython.final
cdef class BytesDumper(CDumper):
format = Format.TEXT
return len_out
+@cython.final
cdef class BytesBinaryDumper(CDumper):
format = Format.BINARY
return size
+@cython.final
cdef class ByteaLoader(CLoader):
format = Format.TEXT
return rv
+@cython.final
cdef class ByteaBinaryLoader(CLoader):
format = Format.BINARY
cdef object cload(self, const char *data, size_t length):
return data[:length]
-
-
-cdef void register_text_c_adapters():
- logger.debug("registering optimised text c adapters")
-
- StringDumper.register(str)
- StringBinaryDumper.register(str)
-
- TextLoader.register(oids.INVALID_OID)
- TextLoader.register(oids.BPCHAR_OID)
- TextLoader.register(oids.NAME_OID)
- TextLoader.register(oids.TEXT_OID)
- TextLoader.register(oids.VARCHAR_OID)
- TextBinaryLoader.register(oids.BPCHAR_OID)
- TextBinaryLoader.register(oids.NAME_OID)
- TextBinaryLoader.register(oids.TEXT_OID)
- TextBinaryLoader.register(oids.VARCHAR_OID)
-
- BytesDumper.register(bytes)
- BytesDumper.register(bytearray)
- BytesDumper.register(memoryview)
- BytesBinaryDumper.register(bytes)
- BytesBinaryDumper.register(bytearray)
- BytesBinaryDumper.register(memoryview)
-
- ByteaLoader.register(oids.BYTEA_OID)
- ByteaBinaryLoader.register(oids.BYTEA_OID)
- ByteaBinaryLoader.register(oids.INVALID_OID)
assert cur.fetchone() == ("hello", "world")
+def test_subclass_dumper(conn):
+ # This might be a C fast object: make sure that the Python code is called
+ from psycopg3.types import StringDumper
+
+ class MyStringDumper(StringDumper):
+ def dump(self, obj):
+ return (obj * 2).encode("utf-8")
+
+ MyStringDumper.register(str, conn)
+ assert conn.execute("select %s", ["hello"]).fetchone()[0] == "hellohello"
+
+
+def test_subclass_loader(conn):
+ # This might be a C fast object: make sure that the Python code is called
+ from psycopg3.types import TextLoader
+
+ class MyTextLoader(TextLoader):
+ def load(self, data):
+ return (data * 2).decode("utf-8")
+
+ MyTextLoader.register("text", conn)
+ assert conn.execute("select 'hello'::text").fetchone()[0] == "hellohello"
+
+
@pytest.mark.parametrize(
"data, format, type, result",
[
sample_values = "values (10::int, 20::int, 'hello'::text), (40, NULL, 'world')"
-sample_tabledef = "col1 int primary key, col2 int, data text"
+sample_tabledef = "col1 serial primary key, col2 int, data text"
sample_text = b"""\
10\t20\thello
assert cur.rowcount == 0
+@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+def test_subclass_adapter(conn, format):
+ if format == Format.TEXT:
+ from psycopg3.types import StringDumper as BaseDumper
+ else:
+ from psycopg3.types import StringBinaryDumper as BaseDumper
+
+ class MyStringDumper(BaseDumper):
+ def dump(self, obj):
+ return super().dump(obj) * 2
+
+ MyStringDumper.register(str, conn)
+
+ cur = conn.cursor()
+ ensure_table(cur, sample_tabledef)
+
+ with cur.copy(
+ f"copy copy_in (data) from stdin (format {format.name})"
+ ) as copy:
+ copy.write_row(("hello",))
+
+ rec = cur.execute("select data from copy_in").fetchone()
+ assert rec[0] == "hellohello"
+
+
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
def test_copy_in_error_empty(conn, format):
cur = conn.cursor()