from . import errors as e
from .pq import Format
from .sql import Composable
-from .oids import TEXT_OID, INVALID_OID
from .proto import Query, Params
if TYPE_CHECKING:
self.types: Tuple[int, ...] = ()
self.formats: Optional[List[Format]] = None
- self._unknown_oid = INVALID_OID
self._parts: List[QueryPart]
self.query = b""
self._encoding = "utf-8"
conn = transformer.connection
if conn:
self._encoding = conn.client_encoding
- if conn.pgconn.server_version < 100000:
- self._unknown_oid = TEXT_OID
def convert(self, query: Query, vars: Optional[Params]) -> None:
"""
self._parts, vars, self._order
)
assert self.formats is not None
- ps: List[Optional[bytes]] = [None] * len(params)
- ts = [self._unknown_oid] * len(params)
- for i in range(len(params)):
- param = params[i]
- if param is not None:
- dumper = self._tx.get_dumper(param, self.formats[i])
- ps[i] = dumper.dump(param)
- ts[i] = dumper.oid
- self.params = ps
- self.types = tuple(ts)
+ self.params, self.types = self._tx.dump_sequence(
+ params, self.formats
+ )
else:
self.params = None
self.types = ()
from . import errors as e
from .pq import Format
-from .oids import INVALID_OID
+from .oids import INVALID_OID, TEXT_OID
from .proto import LoadFunc, AdaptContext
if TYPE_CHECKING:
_pgresult: Optional["PGresult"] = None
def __init__(self, context: Optional[AdaptContext] = None):
+ self._unknown_oid = INVALID_OID
+
# WARNING: don't store context, or you'll create a loop with the Cursor
if context:
self._adapters = context.adapters
- self._connection = context.connection
+ conn = self._connection = context.connection
+ # PG 9.6 gives an error if an unknown oid is emitted as column
+ if conn and conn.pgconn.server_version < 100000:
+ self._unknown_oid = TEXT_OID
else:
from .adapt import global_adapters
self._row_loaders = rc
+ def dump_sequence(
+ self, params: Sequence[Any], formats: Sequence[Format]
+ ) -> Tuple[List[Any], Tuple[int, ...]]:
+ ps: List[Optional[bytes]] = [None] * len(params)
+ ts = [self._unknown_oid] * len(params)
+ for i in range(len(params)):
+ param = params[i]
+ if param is not None:
+ dumper = self.get_dumper(param, formats[i])
+ ps[i] = dumper.dump(param)
+ ts[i] = dumper.oid
+
+ return ps, tuple(ts)
+
def get_dumper(self, obj: Any, format: Format) -> "Dumper":
# Fast path: return a Dumper class already instantiated from the same type
cls = type(obj)
# Copyright (C) 2020 The Psycopg Team
from typing import Any, Callable, Generator, Mapping
-from typing import Optional, Sequence, Tuple, TypeVar, Union
+from typing import List, Optional, Sequence, Tuple, TypeVar, Union
from typing import TYPE_CHECKING
from typing_extensions import Protocol
) -> None:
...
+ def dump_sequence(
+ self, params: Sequence[Any], formats: Sequence[Format]
+ ) -> Tuple[List[Any], Tuple[int, ...]]:
+ ...
+
def get_dumper(self, obj: Any, format: Format) -> "Dumper":
...
def set_row_types(
self, types: Sequence[int], formats: Sequence[Format]
) -> None: ...
+ def dump_sequence(
+ self, params: Sequence[Any], formats: Sequence[Format]
+ ) -> Tuple[List[Any], Tuple[int, ...]]: ...
def get_dumper(self, obj: Any, format: Format) -> Dumper: ...
def load_rows(self, row0: int, row1: int) -> Sequence[Tuple[Any, ...]]: ...
def load_row(self, row: int) -> Optional[Tuple[Any, ...]]: ...
cdef pq.PGresult _pgresult
cdef int _nfields, _ntuples
cdef list _row_loaders
+ cdef int _unknown_oid
def __cinit__(self, context: Optional["AdaptContext"] = None):
+ self._unknown_oid = oids.INVALID_OID
if context is not None:
self.adapters = context.adapters
self.connection = context.connection
+
+ # PG 9.6 gives an error if an unknown oid is emitted as column
+ if self.connection and self.connection.pgconn.server_version < 100000:
+ self._unknown_oid = oids.TEXT_OID
else:
from psycopg3.adapt import global_adapters
self.adapters = global_adapters
f"cannot adapt type {cls.__name__} to format {Format(format).name}"
)
+ cpdef dump_sequence(self, object params, object formats):
+ # Verify that they are not none and that PyList_GET_ITEM won't blow up
+ cdef int nparams = len(params)
+ cdef list ps = PyList_New(nparams)
+ cdef tuple ts = PyTuple_New(nparams)
+ cdef object dumped, oid
+ cdef Py_ssize_t size
+
+ cdef int i
+ for i in range(nparams):
+ param = params[i]
+ if param is not None:
+ format = formats[i]
+ dumper = self.get_dumper(param, format)
+ if isinstance(dumper, CDumper):
+ dumped = PyByteArray_FromStringAndSize("", 0)
+ size = (<CDumper>dumper).cdump(param, <bytearray>dumped, 0)
+ PyByteArray_Resize(dumped, size)
+ oid = (<CDumper>dumper).oid
+ else:
+ dumped = dumper.dump(param)
+ oid = dumper.oid
+ else:
+ dumped = None
+ oid = self._unknown_oid
+
+ Py_INCREF(dumped)
+ PyList_SET_ITEM(ps, i, dumped)
+ Py_INCREF(oid)
+ PyTuple_SET_ITEM(ts, i, oid)
+
+ return ps, ts
+
def load_rows(self, int row0, int row1) -> Sequence[Tuple[Any, ...]]:
if self._pgresult is None:
raise e.InterfaceError("result not set")