from ._compat import Protocol, TypeAlias
PackInt: TypeAlias = Callable[[int], bytes]
-UnpackInt: TypeAlias = Callable[[bytes], Tuple[int]]
+UnpackInt: TypeAlias = Callable[[Buffer], Tuple[int]]
PackFloat: TypeAlias = Callable[[float], bytes]
-UnpackFloat: TypeAlias = Callable[[bytes], Tuple[float]]
+UnpackFloat: TypeAlias = Callable[[Buffer], Tuple[float]]
class UnpackLen(Protocol):
return out
- def as_literal(self, obj: Any) -> Buffer:
+ def as_literal(self, obj: Any) -> bytes:
dumper = self.get_dumper(obj, PY_TEXT)
rv = dumper.quote(obj)
# If the result is quoted, and the oid not unknown or text,
if type_sql:
rv = b"%s::%s" % (rv, type_sql)
+ if not isinstance(rv, bytes):
+ rv = bytes(rv)
return rv
def get_dumper(self, obj: Any, format: PyFormat) -> "Dumper":
return make_row(record)
- def load_sequence(self, record: Sequence[Optional[bytes]]) -> Tuple[Any, ...]:
+ def load_sequence(self, record: Sequence[Optional[Buffer]]) -> Tuple[Any, ...]:
if len(self._row_loaders) != len(record):
raise e.ProgrammingError(
f"cannot load sequence of {len(record)} items:"
# Adaptation types
-DumpFunc: TypeAlias = Callable[[Any], bytes]
-LoadFunc: TypeAlias = Callable[[bytes], Any]
+DumpFunc: TypeAlias = Callable[[Any], Buffer]
+LoadFunc: TypeAlias = Callable[[Buffer], Any]
class AdaptContext(Protocol):
) -> Sequence[Optional[Buffer]]:
...
- def as_literal(self, obj: Any) -> Buffer:
+ def as_literal(self, obj: Any) -> bytes:
...
def get_dumper(self, obj: Any, format: PyFormat) -> Dumper:
def load_row(self, row: int, make_row: "RowMaker[Row]") -> Optional["Row"]:
...
- def load_sequence(self, record: Sequence[Optional[bytes]]) -> Tuple[Any, ...]:
+ def load_sequence(self, record: Sequence[Optional[Buffer]]) -> Tuple[Any, ...]:
...
def get_loader(self, oid: int, format: pq.Format) -> Loader:
def __init__(self, cursor: "Cursor[Any]"):
super().__init__(cursor)
- self._queue: queue.Queue[bytes] = queue.Queue(maxsize=QUEUE_SIZE)
+ self._queue: queue.Queue[Buffer] = queue.Queue(maxsize=QUEUE_SIZE)
self._worker: Optional[threading.Thread] = None
self._worker_error: Optional[BaseException] = None
def __init__(self, cursor: "AsyncCursor[Any]"):
super().__init__(cursor)
- self._queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=QUEUE_SIZE)
+ self._queue: asyncio.Queue[Buffer] = asyncio.Queue(maxsize=QUEUE_SIZE)
self._worker: Optional[asyncio.Future[None]] = None
async def worker(self) -> None:
self._row_mode = False # true if the user is using write_row()
@abstractmethod
- def parse_row(self, data: bytes) -> Optional[Tuple[Any, ...]]:
+ def parse_row(self, data: Buffer) -> Optional[Tuple[Any, ...]]:
...
@abstractmethod
- def write(self, buffer: Union[Buffer, str]) -> bytes:
+ def write(self, buffer: Union[Buffer, str]) -> Buffer:
...
@abstractmethod
- def write_row(self, row: Sequence[Any]) -> bytes:
+ def write_row(self, row: Sequence[Any]) -> Buffer:
...
@abstractmethod
- def end(self) -> bytes:
+ def end(self) -> Buffer:
...
super().__init__(transformer)
self._encoding = encoding
- def parse_row(self, data: bytes) -> Optional[Tuple[Any, ...]]:
+ def parse_row(self, data: Buffer) -> Optional[Tuple[Any, ...]]:
if data:
return parse_row_text(data, self.transformer)
else:
self._signature_sent = True
return data
- def write_row(self, row: Sequence[Any]) -> bytes:
+ def write_row(self, row: Sequence[Any]) -> Buffer:
# Note down that we are writing in row mode: it means we will have
# to take care of the end-of-copy marker too
self._row_mode = True
else:
return b""
- def end(self) -> bytes:
+ def end(self) -> Buffer:
buffer, self._write_buffer = self._write_buffer, bytearray()
return buffer
super().__init__(transformer)
self._signature_sent = False
- def parse_row(self, data: bytes) -> Optional[Tuple[Any, ...]]:
+ def parse_row(self, data: Buffer) -> Optional[Tuple[Any, ...]]:
if not self._signature_sent:
if data[: len(_binary_signature)] != _binary_signature:
raise e.DataError(
self._signature_sent = True
return data
- def write_row(self, row: Sequence[Any]) -> bytes:
+ def write_row(self, row: Sequence[Any]) -> Buffer:
# Note down that we are writing in row mode: it means we will have
# to take care of the end-of-copy marker too
self._row_mode = True
else:
return b""
- def end(self) -> bytes:
+ def end(self) -> Buffer:
# If we have sent no data we need to send the signature
# and the trailer
if not self._signature_sent:
return out
-def _parse_row_text(data: bytes, tx: Transformer) -> Tuple[Any, ...]:
+def _parse_row_text(data: Buffer, tx: Transformer) -> Tuple[Any, ...]:
if not isinstance(data, bytes):
data = bytes(data)
- fields = data.split(b"\t")
- fields[-1] = fields[-1][:-1] # drop \n
+ fields = data.split(b"\t") # type: ignore
+ fields[-1] = fields[-1][:-1] # type: ignore # drop \n
row = [None if f == b"\\N" else _load_re.sub(_load_sub, f) for f in fields]
return tx.load_sequence(row)
-def _parse_row_binary(data: bytes, tx: Transformer) -> Tuple[Any, ...]:
- row: List[Optional[bytes]] = []
+def _parse_row_binary(data: Buffer, tx: Transformer) -> Tuple[Any, ...]:
+ row: List[Optional[Buffer]] = []
nfields = _unpack_int2(data, 0)[0]
pos = 2
for i in range(nfields):
from . import pq
from . import errors as e
-from .abc import PipelineCommand, PQGen, PQGenConn
+from .abc import Buffer, PipelineCommand, PQGen, PQGenConn
from .pq.abc import PGconn, PGresult
from .waiting import Wait, Ready
from ._compat import Deque
return result
-def copy_to(pgconn: PGconn, buffer: bytes) -> PQGen[None]:
+def copy_to(pgconn: PGconn, buffer: Buffer) -> PQGen[None]:
# Retry enqueuing data until successful.
#
# WARNING! This can cause an infinite loop if the buffer is too large. (see
def exec_params(
self,
command: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
+ param_values: Optional[Sequence[Optional[Buffer]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
def send_query_params(
self,
command: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
+ param_values: Optional[Sequence[Optional[Buffer]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
def send_query_prepared(
self,
name: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
+ param_values: Optional[Sequence[Optional[Buffer]]],
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
) -> None:
def exec_prepared(
self,
name: bytes,
- param_values: Optional[Sequence[bytes]],
+ param_values: Optional[Sequence[Buffer]],
param_formats: Optional[Sequence[int]] = None,
result_format: int = 0,
) -> "PGresult":
def notifies(self) -> Optional["PGnotify"]:
...
- def put_copy_data(self, buffer: bytes) -> int:
+ def put_copy_data(self, buffer: Buffer) -> int:
...
def put_copy_end(self, error: Optional[bytes] = None) -> int:
def escape_bytea(self, data: Buffer) -> bytes:
...
- def unescape_bytea(self, data: bytes) -> bytes:
+ def unescape_bytea(self, data: Buffer) -> bytes:
...
def exec_params(
self,
command: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
+ param_values: Optional[Sequence[Optional["abc.Buffer"]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
def send_query_params(
self,
command: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
+ param_values: Optional[Sequence[Optional["abc.Buffer"]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
def send_query_prepared(
self,
name: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
+ param_values: Optional[Sequence[Optional["abc.Buffer"]]],
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
) -> None:
def _query_params_args(
self,
command: bytes,
- param_values: Optional[Sequence[Optional[bytes]]],
+ param_values: Optional[Sequence[Optional["abc.Buffer"]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
aparams = (c_char_p * nparams)(
*(
# convert bytearray/memoryview to bytes
- # TODO: avoid copy, at least in the C implementation.
b
if b is None or isinstance(b, bytes)
else bytes(b) # type: ignore[arg-type]
def exec_prepared(
self,
name: bytes,
- param_values: Optional[Sequence[bytes]],
+ param_values: Optional[Sequence["abc.Buffer"]],
param_formats: Optional[Sequence[int]] = None,
result_format: int = 0,
) -> "PGresult":
alenghts: Optional[Array[c_int]]
if param_values:
nparams = len(param_values)
- aparams = (c_char_p * nparams)(*param_values)
+ aparams = (c_char_p * nparams)(
+ *(
+ # convert bytearray/memoryview to bytes
+ b if b is None or isinstance(b, bytes) else bytes(b)
+ for b in param_values
+ )
+ )
alenghts = (c_int * nparams)(*(len(p) if p else 0 for p in param_values))
else:
nparams = 0
impl.PQfreemem(out)
return rv
- def unescape_bytea(self, data: bytes) -> bytes:
+ def unescape_bytea(self, data: "abc.Buffer") -> bytes:
# not needed, but let's keep it symmetric with the escaping:
# if a connection is passed in, it must be valid.
if self.conn:
self.conn._ensure_pgconn()
len_out = c_size_t()
+ if not isinstance(data, bytes):
+ data = bytes(data)
out = impl.PQunescapeBytea(
data,
byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
_struct_head = struct.Struct("!III") # ndims, hasnull, elem oid
_pack_head = cast(Callable[[int, int, int], bytes], _struct_head.pack)
-_unpack_head = cast(Callable[[bytes], Tuple[int, int, int]], _struct_head.unpack_from)
+_unpack_head = cast(Callable[[Buffer], Tuple[int, int, int]], _struct_head.unpack_from)
_struct_dim = struct.Struct("!II") # dim, lower bound
_pack_dim = cast(Callable[[int, int], bytes], _struct_dim.pack)
-_unpack_dim = cast(Callable[[bytes, int], Tuple[int, int]], _struct_dim.unpack_from)
+_unpack_dim = cast(Callable[[Buffer, int], Tuple[int, int]], _struct_dim.unpack_from)
TEXT_ARRAY_OID = postgres.types["text"].array_oid
_re_esc = re.compile(rb'(["\\])')
def dump(self, obj: List[Any]) -> bytes:
- tokens: List[bytes] = []
+ tokens: List[Buffer] = []
needs_quotes = _get_needs_quotes_regexp(self.delimiter).search
def dump_list(obj: List[Any]) -> None:
if not obj:
return _pack_head(0, 0, sub_oid)
- data: List[bytes] = [b"", b""] # placeholders to avoid a resize
+ data: List[Buffer] = [b"", b""] # placeholders to avoid a resize
dims: List[int] = []
hasnull = 0
_struct_oidlen = struct.Struct("!Ii")
_pack_oidlen = cast(Callable[[int, int], bytes], _struct_oidlen.pack)
_unpack_oidlen = cast(
- Callable[[bytes, int], Tuple[int, int]], _struct_oidlen.unpack_from
+ Callable[[Buffer, int], Tuple[int, int]], _struct_oidlen.unpack_from
)
if not obj:
return start + end
- parts = [start]
+ parts: List[Buffer] = [start]
for item in obj:
if item is None:
super().__init__(oid, context)
self._tx = Transformer(context)
- def _parse_record(self, data: bytes) -> Iterator[Optional[bytes]]:
+ def _parse_record(self, data: Buffer) -> Iterator[Optional[bytes]]:
"""
Split a non-empty representation of a composite type into components.
)
)
- def _walk_record(self, data: bytes) -> Iterator[Tuple[int, int, int]]:
+ def _walk_record(self, data: Buffer) -> Iterator[Tuple[int, int, int]]:
"""
Yield a sequence of (oid, offset, length) for the content of the record
"""
yield oid, i + 8, length
i += (8 + length) if length > 0 else 8
- def _config_types(self, data: bytes) -> None:
+ def _config_types(self, data: Buffer) -> None:
oids = [r[0] for r in self._walk_record(data)]
self._tx.set_loader_types(oids, self.format)
*self._tx.load_sequence(tuple(self._parse_record(data[1:-1])))
)
- def _config_types(self, data: bytes) -> None:
+ def _config_types(self, data: Buffer) -> None:
self._tx.set_loader_types(self.fields_types, self.format)
_struct_timetz = struct.Struct("!qi") # microseconds, sec tz offset
_pack_timetz = cast(Callable[[int, int], bytes], _struct_timetz.pack)
-_unpack_timetz = cast(Callable[[bytes], Tuple[int, int]], _struct_timetz.unpack)
+_unpack_timetz = cast(Callable[[Buffer], Tuple[int, int]], _struct_timetz.unpack)
_struct_interval = struct.Struct("!qii") # microseconds, days, months
_pack_interval = cast(Callable[[int, int, int], bytes], _struct_interval.pack)
_unpack_interval = cast(
- Callable[[bytes], Tuple[int, int, int]], _struct_interval.unpack
+ Callable[[Buffer], Tuple[int, int, int]], _struct_interval.unpack
)
utc = timezone.utc
class BaseHstoreDumper(RecursiveDumper):
- def dump(self, obj: Hstore) -> bytes:
+ def dump(self, obj: Hstore) -> Buffer:
if not obj:
return b""
from ..errors import DataError
JsonDumpsFunction = Callable[[Any], str]
-JsonLoadsFunction = Callable[[Union[str, bytes, bytearray]], Any]
+JsonLoadsFunction = Callable[[Union[str, bytes]], Any]
def set_json_dumps(
def load(self, data: Buffer) -> Any:
# json.loads() cannot work on memoryview.
- if isinstance(data, memoryview):
+ if not isinstance(data, bytes):
data = bytes(data)
return self.loads(data)
if data and data[0] != 1:
raise DataError("unknown jsonb binary format: {data[0]}")
data = data[1:]
- if isinstance(data, memoryview):
+ if not isinstance(data, bytes):
data = bytes(data)
return self.loads(data)
else:
dump = fail_dump
- out = [b"{"]
+ out: List[Buffer] = [b"{"]
for r in obj:
out.append(dump_range_text(r, dump))
out.append(b",")
else:
dump = fail_dump
- out = [pack_len(len(obj))]
+ out: List[Buffer] = [pack_len(len(obj))]
for r in obj:
data = dump_range_binary(r, dump)
out.append(pack_len(len(data)))
class _IntDumper(Dumper):
- def dump(self, obj: Any) -> bytes:
+ def dump(self, obj: Any) -> Buffer:
t = type(obj)
if t is not int:
# Convert to int in order to dump IntEnum correctly
return str(obj).encode()
- def quote(self, obj: Any) -> bytes:
+ def quote(self, obj: Any) -> Buffer:
value = self.dump(obj)
return value if obj >= 0 else b" " + value
_contexts[i] = DefaultContext
_unpack_numeric_head = cast(
- Callable[[bytes], Tuple[int, int, int, int]],
+ Callable[[Buffer], Tuple[int, int, int, int]],
struct.Struct("!HhHH").unpack_from,
)
_pack_numeric_head = cast(
# Copyright (C) 2020 The Psycopg Team
import re
-from typing import Any, Callable, Dict, Generic, Optional, TypeVar, Type, Tuple
+from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar, Type, Tuple
from typing import cast
from decimal import Decimal
from datetime import date, datetime
if obj.isempty:
return b"empty"
- parts = [b"[" if obj.lower_inc else b"("]
+ parts: List[Buffer] = [b"[" if obj.lower_inc else b"("]
def dump_item(item: Any) -> Buffer:
ad = dump(item)
def load(self, data: Buffer) -> Union[bytes, str]:
if self._encoding:
if isinstance(data, memoryview):
- return bytes(data).decode(self._encoding)
- else:
- return data.decode(self._encoding)
+ data = bytes(data)
+ return data.decode(self._encoding)
else:
# return bytes for SQL_ASCII db
+ if not isinstance(data, bytes):
+ data = bytes(data)
return data
super().__init__(cls, context)
self._esc = Escaping(self.connection.pgconn if self.connection else None)
- def dump(self, obj: bytes) -> Buffer:
+ def dump(self, obj: Buffer) -> Buffer:
return self._esc.escape_bytea(obj)
- def quote(self, obj: bytes) -> bytes:
+ def quote(self, obj: Buffer) -> bytes:
escaped = self.dump(obj)
# We cannot use the base quoting because escape_bytea already returns
self.__class__._escaping = Escaping()
def load(self, data: Buffer) -> bytes:
- return bytes(self._escaping.unescape_bytea(data))
+ return self._escaping.unescape_bytea(data)
class ByteaBinaryLoader(Loader):
format = Format.BINARY
- def load(self, data: Buffer) -> bytes:
+ def load(self, data: Buffer) -> Buffer:
return data
from psycopg import pq
from psycopg import abc
from psycopg.rows import Row, RowMaker
-from psycopg.abc import PipelineCommand
from psycopg.adapt import AdaptersMap, PyFormat
from psycopg.pq.abc import PGconn, PGresult
from psycopg.connection import BaseConnection
def dump_sequence(
self, params: Sequence[Any], formats: Sequence[PyFormat]
) -> Sequence[Optional[abc.Buffer]]: ...
- def as_literal(self, obj: Any) -> abc.Buffer: ...
+ def as_literal(self, obj: Any) -> bytes: ...
def get_dumper(self, obj: Any, format: PyFormat) -> abc.Dumper: ...
def load_rows(self, row0: int, row1: int, make_row: RowMaker[Row]) -> List[Row]: ...
def load_row(self, row: int, make_row: RowMaker[Row]) -> Optional[Row]: ...
- def load_sequence(self, record: Sequence[Optional[bytes]]) -> Tuple[Any, ...]: ...
+ def load_sequence(
+ self, record: Sequence[Optional[abc.Buffer]]
+ ) -> Tuple[Any, ...]: ...
def get_loader(self, oid: int, format: pq.Format) -> abc.Loader: ...
# Generators
def fetch_many(pgconn: PGconn) -> abc.PQGen[List[PGresult]]: ...
def fetch(pgconn: PGconn) -> abc.PQGen[Optional[PGresult]]: ...
def pipeline_communicate(
- pgconn: PGconn, commands: Deque[PipelineCommand]
+ pgconn: PGconn, commands: Deque[abc.PipelineCommand]
) -> abc.PQGen[List[List[PGresult]]]: ...
# Copy support
def format_row_binary(
row: Sequence[Any], tx: abc.Transformer, out: Optional[bytearray] = None
) -> bytearray: ...
-def parse_row_text(data: bytes, tx: abc.Transformer) -> Tuple[Any, ...]: ...
-def parse_row_binary(data: bytes, tx: abc.Transformer) -> Tuple[Any, ...]: ...
+def parse_row_text(data: abc.Buffer, tx: abc.Transformer) -> Tuple[Any, ...]: ...
+def parse_row_binary(data: abc.Buffer, tx: abc.Transformer) -> Tuple[Any, ...]: ...
# vim: set syntax=python:
make_row, <PyObject *>record, NULL)
return record
- cpdef object load_sequence(self, record: Sequence[Optional[bytes]]):
+ cpdef object load_sequence(self, record: Sequence[Optional[Buffer]]):
cdef Py_ssize_t nfields = len(record)
out = PyTuple_New(nfields)
cdef PyObject *loader # borrowed RowLoader