def __init__(self, src: type, context: AdaptContext = None):
self.src = src
self.context = context
- self.connection = _connection_from_context(context)
+ self.connection = connection_from_context(context)
@abstractmethod
def dump(self, obj: Any) -> bytes:
def __init__(self, oid: int, context: AdaptContext = None):
self.oid = oid
self.context = context
- self.connection = _connection_from_context(context)
+ self.connection = connection_from_context(context)
@abstractmethod
def load(self, data: bytes) -> Any:
return binary_
-def _connection_from_context(
+def connection_from_context(
context: AdaptContext,
) -> Optional[BaseConnection]:
if not context:
def __init__(self, conn: Optional[PGconn] = None):
self.conn = conn
- def escape_literal(self, data: bytes) -> bytes:
+ def escape_literal(self, data: "proto.Buffer") -> memoryview:
if self.conn:
self.conn._ensure_pgconn()
+ # TODO: might be done without copy (however C does that)
+ if not isinstance(data, bytes):
+ data = bytes(data)
out = impl.PQescapeLiteral(self.conn.pgconn_ptr, data, len(data))
if not out:
raise PQerror(
)
rv = string_at(out)
impl.PQfreemem(out)
- return rv
+ return memoryview(rv)
else:
raise PQerror("escape_literal failed: no connection provided")
)
return out.value
- def escape_bytea(self, data: bytes) -> bytes:
+ def escape_bytea(self, data: "proto.Buffer") -> memoryview:
len_out = c_size_t()
+ # TODO: might be able to do without a copy but it's a mess.
+ # the C library does it better anyway, so maybe not worth optimising
+ # https://mail.python.org/pipermail/python-dev/2012-September/121780.html
+ if not isinstance(data, bytes):
+ data = bytes(data)
if self.conn:
self.conn._ensure_pgconn()
out = impl.PQescapeByteaConn(
rv = string_at(out, len_out.value - 1) # out includes final 0
impl.PQfreemem(out)
- return rv
+ return memoryview(rv)
- def unescape_bytea(self, data: bytes) -> bytes:
+ def unescape_bytea(self, data: bytes) -> memoryview:
# not needed, but let's keep it symmetric with the escaping:
# if a connection is passed in, it must be valid.
if self.conn:
rv = string_at(out, len_out.value)
impl.PQfreemem(out)
- return rv
+ return memoryview(rv)
# Copyright (C) 2020 The Psycopg Team
-from typing import Any, Callable, List, Optional, Sequence, Tuple
+from typing import Any, Callable, List, Optional, Sequence, Tuple, Union
from typing import TYPE_CHECKING
from typing_extensions import Protocol
if TYPE_CHECKING:
from .misc import PGnotify, ConninfoOption, PGresAttDesc
+# An object implementing the buffer protocol (ish)
+Buffer = Union[bytes, bytearray, memoryview]
+
class PGconn(Protocol):
def __init__(self, conn: Optional[PGconn] = None):
...
- def escape_literal(self, data: bytes) -> bytes:
+ def escape_literal(self, data: Buffer) -> memoryview:
...
def escape_identifier(self, data: bytes) -> bytes:
def escape_string(self, data: bytes) -> bytes:
...
- def escape_bytea(self, data: bytes) -> bytes:
+ def escape_bytea(self, data: Buffer) -> memoryview:
...
- def unescape_bytea(self, data: bytes) -> bytes:
+ def unescape_bytea(self, data: bytes) -> memoryview:
...
import string
from typing import Any, Iterator, List, Optional, Sequence, Union
+from typing import TYPE_CHECKING
from .pq import Escaping, Format
from .proto import AdaptContext
+if TYPE_CHECKING:
+ from .connection import BaseConnection
+
def quote(obj: Any, context: AdaptContext = None) -> str:
"""
rules used, otherwise only global rules are used.
"""
- return Literal(obj).as_string(context)
+ from .adapt import connection_from_context
+
+ conn = connection_from_context(context)
+ enc = conn.client_encoding if conn else "utf-8"
+ return Literal(obj).as_bytes(context).decode(enc)
class Composable(object):
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self._obj!r})"
- def as_string(self, context: AdaptContext) -> str:
+ def as_bytes(self, context: AdaptContext) -> bytes:
"""
- Return the string value of the object.
+ Return the value of the object as bytes.
- :param context: the context to evaluate the string into.
+ :param context: the context to evaluate the object into.
:type context: `connection` or `cursor`
The method is automatically invoked by `~psycopg3.Cursor.execute()`,
`!Composable` is passed instead of the query string.
"""
+ # TODO: add tests and docs for as_bytes
raise NotImplementedError
+ def as_string(self, context: AdaptContext) -> str:
+ """
+ Return the value of the object as string.
+
+ :param context: the context to evaluate the string into.
+ :type context: `connection` or `cursor`
+
+ """
+ conn = _connection_from_context(context)
+ return self.as_bytes(context).decode(conn.client_encoding)
+
def __add__(self, other: "Composable") -> "Composed":
if isinstance(other, Composed):
return Composed([self]) + other
]
super().__init__(seq)
- def as_string(self, context: AdaptContext) -> str:
- rv = []
- for obj in self._obj:
- rv.append(obj.as_string(context))
- return "".join(rv)
+ def as_bytes(self, context: AdaptContext) -> bytes:
+ return b"".join(obj.as_bytes(context) for obj in self._obj)
def __iter__(self) -> Iterator[Composable]:
return iter(self._obj)
def as_string(self, context: AdaptContext) -> str:
return self._obj
+ def as_bytes(self, context: AdaptContext) -> bytes:
+ conn = _connection_from_context(context)
+ return self._obj.encode(conn.client_encoding)
+
def format(self, *args: Any, **kwargs: Any) -> Composed:
"""
Merge `Composable` objects into a template.
def __repr__(self) -> str:
return f"{self.__class__.__name__}({', '.join(map(repr, self._obj))})"
- def as_string(self, context: AdaptContext) -> str:
- from .adapt import _connection_from_context
-
+ def as_bytes(self, context: AdaptContext) -> bytes:
conn = _connection_from_context(context)
- if not conn:
- raise ValueError(f"no connection in the context: {context}")
-
esc = Escaping(conn.pgconn)
enc = conn.client_encoding
escs = [esc.escape_identifier(s.encode(enc)) for s in self._obj]
- return b".".join(escs).decode(enc)
+ return b".".join(escs)
class Literal(Composable):
"""
- def as_string(self, context: AdaptContext) -> str:
- from .adapt import _connection_from_context, Transformer
+ def as_bytes(self, context: AdaptContext) -> bytes:
+ from .adapt import Transformer
- conn = _connection_from_context(context)
- tx = context if isinstance(context, Transformer) else Transformer(conn)
+ tx = context if isinstance(context, Transformer) else Transformer()
dumper = tx.get_dumper(self._obj, Format.TEXT)
- quoted = dumper.quote(self._obj)
- return quoted.decode(conn.client_encoding if conn else "utf-8")
+ return dumper.quote(self._obj)
class Placeholder(Composable):
code = "s" if self._format == Format.TEXT else "b"
return f"%({self._obj}){code}" if self._obj else f"%{code}"
+ def as_bytes(self, context: AdaptContext) -> bytes:
+ conn = _connection_from_context(context)
+ return self.as_string(context).encode(conn.client_encoding)
+
# Literals
NULL = SQL("NULL")
DEFAULT = SQL("DEFAULT")
+
+
+def _connection_from_context(context: AdaptContext) -> "BaseConnection":
+ from .adapt import connection_from_context
+
+ conn = connection_from_context(context)
+ if not conn:
+ raise ValueError(f"no connection in the context: {context}")
+
+ return conn
# Double quotes and backslashes embedded in element values will be
# backslash-escaped.
- _re_escape = re.compile(br'(["\\])')
+ _re_esc = re.compile(br'(["\\])')
def dump(self, obj: List[Any]) -> bytes:
tokens: List[bytes] = []
dumper = self._tx.get_dumper(item, Format.TEXT)
ad = dumper.dump(item)
if self._re_needs_quotes.search(ad):
- ad = b'"' + self._re_escape.sub(br"\\\1", ad) + b'"'
+ ad = (
+ b'"' + self._re_esc.sub(br"\\\1", bytes(ad)) + b'"'
+ )
tokens.append(ad)
if not oid:
oid = dumper.oid
if not ad:
ad = b'""'
elif self._re_needs_quotes.search(ad):
- ad = b'"' + self._re_escape.sub(br"\1\1", ad) + b'"'
+ ad = b'"' + self._re_esc.sub(br"\1\1", ad) + b'"'
parts.append(ad)
parts.append(sep)
return b"".join(parts)
_re_needs_quotes = re.compile(br'[",\\\s()]')
- _re_escape = re.compile(br"([\\\"])")
+ _re_esc = re.compile(br"([\\\"])")
@Dumper.text(tuple)
return data.decode(self.encoding)
-class _BinaryDumper(Dumper):
+@Dumper.text(bytes)
+@Dumper.text(bytearray)
+@Dumper.text(memoryview)
+class BytesDumper(Dumper):
oid = builtins["bytea"].oid
def __init__(self, src: type, context: AdaptContext = None):
self.connection.pgconn if self.connection else None
)
-
-@Dumper.text(bytes)
-class BytesDumper(_BinaryDumper):
- def dump(self, obj: bytes) -> bytes:
+ def dump(self, obj: bytes) -> memoryview:
+ # TODO: mypy doesn't complain, but this function has the wrong signature
+ # probably dump return value should be extended to Buffer
return self.esc.escape_bytea(obj)
-@Dumper.text(bytearray)
-class BytearrayDumper(_BinaryDumper):
- def dump(self, obj: bytearray) -> bytes:
- return self.esc.escape_bytea(bytes(obj))
-
-
-@Dumper.text(memoryview)
-class MemoryviewDumper(_BinaryDumper):
- def dump(self, obj: memoryview) -> bytes:
- return self.esc.escape_bytea(bytes(obj))
-
-
@Dumper.binary(bytes)
@Dumper.binary(bytearray)
@Dumper.binary(memoryview)
def dump(
self, obj: Union[bytes, bytearray, memoryview]
) -> Union[bytes, bytearray, memoryview]:
+ # TODO: mypy doesn't complain, but this function has the wrong signature
return obj
cdef object _src
cdef object _context
cdef object _connection
+ cdef PGconn _pgconn
def __init__(self, src: type, context: AdaptContext = None):
self._src = src
self._context = context
self._connection = _connection_from_context(context)
+ self._pgconn = (
+ self._connection.pgconn if self._connection is not None else None
+ )
@property
def src(self) -> type:
def quote(self, obj: Any) -> bytes:
# TODO: can be optimized
- cdef bytes value = self.dump(obj)
+ cdef object ovalue = self.dump(obj)
+
+ cdef bytes value
+ if isinstance(ovalue, bytes):
+ value = ovalue
+ else:
+ value = bytes(ovalue)
+
cdef bytes tmp
cdef Escaping esc
if self.connection:
- esc = Escaping(self.connection.pgconn)
- return esc.escape_literal(value)
+ esc = Escaping(self._pgconn)
+ return bytes(esc.escape_literal(value))
else:
esc = Escaping()
cdef _connection_from_context(object context):
- from psycopg3.adapt import _connection_from_context
- return _connection_from_context(context)
+ from psycopg3.adapt import connection_from_context
+ return connection_from_context(context)
def register_builtin_c_adapters():
cdef class Escaping:
cdef PGconn conn
+
+
+cdef class PQBuffer:
+ cdef unsigned char *buf
+ cdef Py_ssize_t len
+
+ @staticmethod
+ cdef PQBuffer _from_buffer(unsigned char *buf, Py_ssize_t len)
# Copyright (C) 2020 The Psycopg Team
+from libc.string cimport strlen
from posix.unistd cimport getpid
from cpython.mem cimport PyMem_Malloc, PyMem_Free
from cpython.bytes cimport PyBytes_AsString, PyBytes_AsStringAndSize
cdef int *alenghts = NULL
cdef char *ptr
cdef Py_ssize_t length
- cdef Py_buffer buf
if nparams:
aparams = <char **>PyMem_Malloc(nparams * sizeof(char *))
if obj is None:
aparams[i] = NULL
alenghts[i] = 0
- elif isinstance(obj, bytes):
- PyBytes_AsStringAndSize(obj, &ptr, &length)
+ else:
+ # TODO: it is a leak if this fails (but it should only fail
+ # on internal error, e.g. if obj is not a buffer)
+ _buffer_as_string_and_size(obj, &ptr, &length)
aparams[i] = ptr
alenghts[i] = length
- elif PyObject_CheckBuffer(obj):
- PyObject_GetBuffer(obj, &buf, PyBUF_SIMPLE)
- aparams[i] = <char *>buf.buf
- alenghts[i] = buf.len
- PyBuffer_Release(&buf)
- else:
- raise TypeError(f"bytes or buffer expected, got {type(obj)}")
cdef Oid *atypes = NULL
if param_types is not None:
def __repr__(self):
return f"<{type(self).__name__} ({self.keyword.decode('ascii')})>"
-
cdef class Escaping:
def __init__(self, conn: Optional[PGconn] = None):
self.conn = conn
- def escape_literal(self, data: bytes) -> bytes:
+ def escape_literal(self, data: "Buffer") -> memoryview:
cdef char *out
cdef bytes rv
+ cdef char *ptr
+ cdef Py_ssize_t length
- if self.conn is not None:
- if self.conn.pgconn_ptr is NULL:
- raise PQerror("the connection is closed")
- out = impl.PQescapeLiteral(self.conn.pgconn_ptr, data, len(data))
- if out is NULL:
- raise PQerror(
- f"escape_literal failed: {error_message(self.conn)}"
- )
- rv = out
- impl.PQfreemem(out)
- return rv
-
- else:
+ if self.conn is None:
raise PQerror("escape_literal failed: no connection provided")
+ if self.conn.pgconn_ptr is NULL:
+ raise PQerror("the connection is closed")
+ _buffer_as_string_and_size(data, &ptr, &length)
+
+ out = impl.PQescapeLiteral(self.conn.pgconn_ptr, ptr, length)
+ if out is NULL:
+ raise PQerror(
+ f"escape_literal failed: {error_message(self.conn)}"
+ )
+
+ return memoryview(PQBuffer._from_buffer(<unsigned char *>out, strlen(out)))
+
+ # TODO: return PQBuffer
def escape_identifier(self, data: bytes) -> bytes:
cdef char *out
cdef bytes rv
PyMem_Free(out)
return rv
-
- def escape_bytea(self, data: bytes) -> bytes:
+ def escape_bytea(self, data: "Buffer") -> memoryview:
cdef size_t len_out
cdef unsigned char *out
+ cdef char *ptr
+ cdef Py_ssize_t length
+
+ if self.conn is not None and self.conn.pgconn_ptr is NULL:
+ raise PQerror("the connection is closed")
+
+ _buffer_as_string_and_size(data, &ptr, &length)
+
if self.conn is not None:
- if self.conn.pgconn_ptr is NULL:
- raise PQerror("the connection is closed")
out = impl.PQescapeByteaConn(
- self.conn.pgconn_ptr, data, len(data), &len_out)
+ self.conn.pgconn_ptr, <unsigned char *>ptr, length, &len_out)
else:
- out = impl.PQescapeBytea(data, len(data), &len_out)
+ out = impl.PQescapeBytea(<unsigned char *>ptr, length, &len_out)
+
if out is NULL:
raise MemoryError(
f"couldn't allocate for escape_bytea of {len(data)} bytes"
)
- # TODO: without copy?
- rv = out[:len_out - 1] # out includes final 0
- impl.PQfreemem(out)
- return rv
+ return memoryview(
+ PQBuffer._from_buffer(out, len_out - 1) # out includes final 0
+ )
- def unescape_bytea(self, data: bytes) -> bytes:
+ def unescape_bytea(self, data: bytes) -> memoryview:
# not needed, but let's keep it symmetric with the escaping:
# if a connection is passed in, it must be valid.
if self.conn is not None:
f"couldn't allocate for unescape_bytea of {len(data)} bytes"
)
- rv = out[:len_out]
- impl.PQfreemem(out)
+ return memoryview(PQBuffer._from_buffer(out, len_out))
+
+
+cdef class PQBuffer:
+ """
+ Wrap a chunk of memory allocated by the libpq and expose it as memoryview.
+ """
+ @staticmethod
+ cdef PQBuffer _from_buffer(unsigned char *buf, Py_ssize_t len):
+ cdef PQBuffer rv = PQBuffer.__new__(PQBuffer)
+ rv.buf = buf
+ rv.len = len
return rv
+
+ def __cinit__(self):
+ self.buf = NULL
+ self.len = 0
+
+ def __dealloc__(self):
+ if self.buf:
+ impl.PQfreemem(self.buf)
+
+ def __repr__(self):
+ return (
+ f"{self.__class__.__module__}.{self.__class__.__qualname__}"
+ f"({bytes(self)})"
+ )
+
+ def __getbuffer__(self, Py_buffer *buffer, int flags):
+ buffer.buf = self.buf
+ buffer.obj = self
+ buffer.len = self.len
+ buffer.itemsize = sizeof(unsigned char)
+ buffer.readonly = 1
+ buffer.ndim = 1
+ buffer.format = NULL # unsigned char
+ buffer.shape = &self.len
+ buffer.strides = NULL
+ buffer.suboffsets = NULL
+ buffer.internal = NULL
+
+ def __releasebuffer__(self, Py_buffer *buffer):
+ pass
+
+
+cdef int _buffer_as_string_and_size(data: "Buffer", char **ptr, Py_ssize_t *length) except -1:
+ cdef Py_buffer buf
+
+ if isinstance(data, bytes):
+ PyBytes_AsStringAndSize(data, ptr, length)
+ elif PyObject_CheckBuffer(data):
+ PyObject_GetBuffer(data, &buf, PyBUF_SIMPLE)
+ ptr[0] = <char *>buf.buf
+ length[0] = buf.len
+ PyBuffer_Release(&buf)
+ else:
+ raise TypeError(f"bytes or buffer expected, got {type(data)}")
from cpython.unicode cimport PyUnicode_AsUTF8String, PyUnicode_AsEncodedString
from psycopg3_c cimport libpq, oids
+from psycopg3_c.pq_cython cimport Escaping
cdef class _StringDumper(CDumper):
return data[:length]
+cdef class BytesDumper(CDumper):
+ cdef Escaping esc
+
+ def __init__(self, src: type, context: AdaptContext):
+ super().__init__(src, context)
+ self.esc = Escaping(self._pgconn)
+
+ @property
+ def oid(self) -> int:
+ return oids.BYTEA_OID
+
+ def dump(self, obj) -> memoryview:
+ return self.esc.escape_bytea(obj)
+
+
+cdef class BytesBinaryDumper(BytesDumper):
+ def dump(self, obj):
+ return obj
+
+
cdef class ByteaLoader(CLoader):
cdef object cload(self, const char *data, size_t length):
cdef size_t len_out
TextLoader.register(oids.VARCHAR_OID)
TextLoader.register(oids.VARCHAR_OID, format=Format.BINARY)
+ BytesDumper.register(bytes)
+ BytesDumper.register(bytearray)
+ BytesDumper.register(memoryview)
+ BytesBinaryDumper.register(bytes, format=Format.BINARY)
+ BytesBinaryDumper.register(bytearray, format=Format.BINARY)
+ BytesBinaryDumper.register(memoryview, format=Format.BINARY)
+
ByteaLoader.register(oids.BYTEA_OID)
ByteaBinaryLoader.register(oids.BYTEA_OID, format=Format.BINARY)