From: Daniele Varrazzo Date: Thu, 17 Dec 2020 04:02:35 +0000 (+0100) Subject: Added buffer object to wrap libpq memory X-Git-Tag: 3.0.dev0~264 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e9be5632d12f4ea98ee135b94ed69a1f00391c40;p=thirdparty%2Fpsycopg.git Added buffer object to wrap libpq memory Several code paths extended to deal with objects implementing the buffer protocol instead of just bytes: this allows to perform less memory copy. Added sql.Composable.as_bytes method to use in preference of as_string (you may do without decoding altogether). --- diff --git a/psycopg3/psycopg3/adapt.py b/psycopg3/psycopg3/adapt.py index 7938d4118..03eca5904 100644 --- a/psycopg3/psycopg3/adapt.py +++ b/psycopg3/psycopg3/adapt.py @@ -29,7 +29,7 @@ class Dumper(ABC): def __init__(self, src: type, context: AdaptContext = None): self.src = src self.context = context - self.connection = _connection_from_context(context) + self.connection = connection_from_context(context) @abstractmethod def dump(self, obj: Any) -> bytes: @@ -100,7 +100,7 @@ class Loader(ABC): def __init__(self, oid: int, context: AdaptContext = None): self.oid = oid self.context = context - self.connection = _connection_from_context(context) + self.connection = connection_from_context(context) @abstractmethod def load(self, data: bytes) -> Any: @@ -142,7 +142,7 @@ class Loader(ABC): return binary_ -def _connection_from_context( +def connection_from_context( context: AdaptContext, ) -> Optional[BaseConnection]: if not context: diff --git a/psycopg3/psycopg3/pq/pq_ctypes.py b/psycopg3/psycopg3/pq/pq_ctypes.py index 546d9c210..04e7d637b 100644 --- a/psycopg3/psycopg3/pq/pq_ctypes.py +++ b/psycopg3/psycopg3/pq/pq_ctypes.py @@ -799,9 +799,12 @@ class Escaping: def __init__(self, conn: Optional[PGconn] = None): self.conn = conn - def escape_literal(self, data: bytes) -> bytes: + def escape_literal(self, data: "proto.Buffer") -> memoryview: if self.conn: self.conn._ensure_pgconn() + # TODO: might be done without copy (however C does that) + if not isinstance(data, bytes): + data = bytes(data) out = impl.PQescapeLiteral(self.conn.pgconn_ptr, data, len(data)) if not out: raise PQerror( @@ -809,7 +812,7 @@ class Escaping: ) rv = string_at(out) impl.PQfreemem(out) - return rv + return memoryview(rv) else: raise PQerror("escape_literal failed: no connection provided") @@ -859,8 +862,13 @@ class Escaping: ) return out.value - def escape_bytea(self, data: bytes) -> bytes: + def escape_bytea(self, data: "proto.Buffer") -> memoryview: len_out = c_size_t() + # TODO: might be able to do without a copy but it's a mess. + # the C library does it better anyway, so maybe not worth optimising + # https://mail.python.org/pipermail/python-dev/2012-September/121780.html + if not isinstance(data, bytes): + data = bytes(data) if self.conn: self.conn._ensure_pgconn() out = impl.PQescapeByteaConn( @@ -880,9 +888,9 @@ class Escaping: rv = string_at(out, len_out.value - 1) # out includes final 0 impl.PQfreemem(out) - return rv + return memoryview(rv) - def unescape_bytea(self, data: bytes) -> bytes: + def unescape_bytea(self, data: bytes) -> memoryview: # not needed, but let's keep it symmetric with the escaping: # if a connection is passed in, it must be valid. if self.conn: @@ -897,4 +905,4 @@ class Escaping: rv = string_at(out, len_out.value) impl.PQfreemem(out) - return rv + return memoryview(rv) diff --git a/psycopg3/psycopg3/pq/proto.py b/psycopg3/psycopg3/pq/proto.py index 145d0a291..82ed003b8 100644 --- a/psycopg3/psycopg3/pq/proto.py +++ b/psycopg3/psycopg3/pq/proto.py @@ -4,7 +4,7 @@ Protocol objects to represent objects exposed by different pq implementations. # Copyright (C) 2020 The Psycopg Team -from typing import Any, Callable, List, Optional, Sequence, Tuple +from typing import Any, Callable, List, Optional, Sequence, Tuple, Union from typing import TYPE_CHECKING from typing_extensions import Protocol @@ -14,6 +14,9 @@ from ._enums import Ping, PollingStatus, TransactionStatus if TYPE_CHECKING: from .misc import PGnotify, ConninfoOption, PGresAttDesc +# An object implementing the buffer protocol (ish) +Buffer = Union[bytes, bytearray, memoryview] + class PGconn(Protocol): @@ -335,7 +338,7 @@ class Escaping(Protocol): def __init__(self, conn: Optional[PGconn] = None): ... - def escape_literal(self, data: bytes) -> bytes: + def escape_literal(self, data: Buffer) -> memoryview: ... def escape_identifier(self, data: bytes) -> bytes: @@ -344,8 +347,8 @@ class Escaping(Protocol): def escape_string(self, data: bytes) -> bytes: ... - def escape_bytea(self, data: bytes) -> bytes: + def escape_bytea(self, data: Buffer) -> memoryview: ... - def unescape_bytea(self, data: bytes) -> bytes: + def unescape_bytea(self, data: bytes) -> memoryview: ... diff --git a/psycopg3/psycopg3/sql.py b/psycopg3/psycopg3/sql.py index b0dbcb5d4..ba7587ec3 100644 --- a/psycopg3/psycopg3/sql.py +++ b/psycopg3/psycopg3/sql.py @@ -6,10 +6,14 @@ SQL composition utility module import string from typing import Any, Iterator, List, Optional, Sequence, Union +from typing import TYPE_CHECKING from .pq import Escaping, Format from .proto import AdaptContext +if TYPE_CHECKING: + from .connection import BaseConnection + def quote(obj: Any, context: AdaptContext = None) -> str: """ @@ -24,7 +28,11 @@ def quote(obj: Any, context: AdaptContext = None) -> str: rules used, otherwise only global rules are used. """ - return Literal(obj).as_string(context) + from .adapt import connection_from_context + + conn = connection_from_context(context) + enc = conn.client_encoding if conn else "utf-8" + return Literal(obj).as_bytes(context).decode(enc) class Composable(object): @@ -48,11 +56,11 @@ class Composable(object): def __repr__(self) -> str: return f"{self.__class__.__name__}({self._obj!r})" - def as_string(self, context: AdaptContext) -> str: + def as_bytes(self, context: AdaptContext) -> bytes: """ - Return the string value of the object. + Return the value of the object as bytes. - :param context: the context to evaluate the string into. + :param context: the context to evaluate the object into. :type context: `connection` or `cursor` The method is automatically invoked by `~psycopg3.Cursor.execute()`, @@ -60,8 +68,20 @@ class Composable(object): `!Composable` is passed instead of the query string. """ + # TODO: add tests and docs for as_bytes raise NotImplementedError + def as_string(self, context: AdaptContext) -> str: + """ + Return the value of the object as string. + + :param context: the context to evaluate the string into. + :type context: `connection` or `cursor` + + """ + conn = _connection_from_context(context) + return self.as_bytes(context).decode(conn.client_encoding) + def __add__(self, other: "Composable") -> "Composed": if isinstance(other, Composed): return Composed([self]) + other @@ -108,11 +128,8 @@ class Composed(Composable): ] super().__init__(seq) - def as_string(self, context: AdaptContext) -> str: - rv = [] - for obj in self._obj: - rv.append(obj.as_string(context)) - return "".join(rv) + def as_bytes(self, context: AdaptContext) -> bytes: + return b"".join(obj.as_bytes(context) for obj in self._obj) def __iter__(self) -> Iterator[Composable]: return iter(self._obj) @@ -184,6 +201,10 @@ class SQL(Composable): def as_string(self, context: AdaptContext) -> str: return self._obj + def as_bytes(self, context: AdaptContext) -> bytes: + conn = _connection_from_context(context) + return self._obj.encode(conn.client_encoding) + def format(self, *args: Any, **kwargs: Any) -> Composed: """ Merge `Composable` objects into a template. @@ -335,17 +356,12 @@ class Identifier(Composable): def __repr__(self) -> str: return f"{self.__class__.__name__}({', '.join(map(repr, self._obj))})" - def as_string(self, context: AdaptContext) -> str: - from .adapt import _connection_from_context - + def as_bytes(self, context: AdaptContext) -> bytes: conn = _connection_from_context(context) - if not conn: - raise ValueError(f"no connection in the context: {context}") - esc = Escaping(conn.pgconn) enc = conn.client_encoding escs = [esc.escape_identifier(s.encode(enc)) for s in self._obj] - return b".".join(escs).decode(enc) + return b".".join(escs) class Literal(Composable): @@ -369,14 +385,12 @@ class Literal(Composable): """ - def as_string(self, context: AdaptContext) -> str: - from .adapt import _connection_from_context, Transformer + def as_bytes(self, context: AdaptContext) -> bytes: + from .adapt import Transformer - conn = _connection_from_context(context) - tx = context if isinstance(context, Transformer) else Transformer(conn) + tx = context if isinstance(context, Transformer) else Transformer() dumper = tx.get_dumper(self._obj, Format.TEXT) - quoted = dumper.quote(self._obj) - return quoted.decode(conn.client_encoding if conn else "utf-8") + return dumper.quote(self._obj) class Placeholder(Composable): @@ -430,7 +444,21 @@ class Placeholder(Composable): code = "s" if self._format == Format.TEXT else "b" return f"%({self._obj}){code}" if self._obj else f"%{code}" + def as_bytes(self, context: AdaptContext) -> bytes: + conn = _connection_from_context(context) + return self.as_string(context).encode(conn.client_encoding) + # Literals NULL = SQL("NULL") DEFAULT = SQL("DEFAULT") + + +def _connection_from_context(context: AdaptContext) -> "BaseConnection": + from .adapt import connection_from_context + + conn = connection_from_context(context) + if not conn: + raise ValueError(f"no connection in the context: {context}") + + return conn diff --git a/psycopg3/psycopg3/types/array.py b/psycopg3/psycopg3/types/array.py index 03fc21bdc..6599e3339 100644 --- a/psycopg3/psycopg3/types/array.py +++ b/psycopg3/psycopg3/types/array.py @@ -62,7 +62,7 @@ class ListDumper(BaseListDumper): # Double quotes and backslashes embedded in element values will be # backslash-escaped. - _re_escape = re.compile(br'(["\\])') + _re_esc = re.compile(br'(["\\])') def dump(self, obj: List[Any]) -> bytes: tokens: List[bytes] = [] @@ -83,7 +83,9 @@ class ListDumper(BaseListDumper): dumper = self._tx.get_dumper(item, Format.TEXT) ad = dumper.dump(item) if self._re_needs_quotes.search(ad): - ad = b'"' + self._re_escape.sub(br"\\\1", ad) + b'"' + ad = ( + b'"' + self._re_esc.sub(br"\\\1", bytes(ad)) + b'"' + ) tokens.append(ad) if not oid: oid = dumper.oid diff --git a/psycopg3/psycopg3/types/composite.py b/psycopg3/psycopg3/types/composite.py index 3e5c42d47..f48a398ec 100644 --- a/psycopg3/psycopg3/types/composite.py +++ b/psycopg3/psycopg3/types/composite.py @@ -169,7 +169,7 @@ class SequenceDumper(Dumper): if not ad: ad = b'""' elif self._re_needs_quotes.search(ad): - ad = b'"' + self._re_escape.sub(br"\1\1", ad) + b'"' + ad = b'"' + self._re_esc.sub(br"\1\1", ad) + b'"' parts.append(ad) parts.append(sep) @@ -179,7 +179,7 @@ class SequenceDumper(Dumper): return b"".join(parts) _re_needs_quotes = re.compile(br'[",\\\s()]') - _re_escape = re.compile(br"([\\\"])") + _re_esc = re.compile(br"([\\\"])") @Dumper.text(tuple) diff --git a/psycopg3/psycopg3/types/text.py b/psycopg3/psycopg3/types/text.py index bd04dc14b..8315b367a 100644 --- a/psycopg3/psycopg3/types/text.py +++ b/psycopg3/psycopg3/types/text.py @@ -86,7 +86,10 @@ class UnknownLoader(Loader): return data.decode(self.encoding) -class _BinaryDumper(Dumper): +@Dumper.text(bytes) +@Dumper.text(bytearray) +@Dumper.text(memoryview) +class BytesDumper(Dumper): oid = builtins["bytea"].oid def __init__(self, src: type, context: AdaptContext = None): @@ -95,25 +98,12 @@ class _BinaryDumper(Dumper): self.connection.pgconn if self.connection else None ) - -@Dumper.text(bytes) -class BytesDumper(_BinaryDumper): - def dump(self, obj: bytes) -> bytes: + def dump(self, obj: bytes) -> memoryview: + # TODO: mypy doesn't complain, but this function has the wrong signature + # probably dump return value should be extended to Buffer return self.esc.escape_bytea(obj) -@Dumper.text(bytearray) -class BytearrayDumper(_BinaryDumper): - def dump(self, obj: bytearray) -> bytes: - return self.esc.escape_bytea(bytes(obj)) - - -@Dumper.text(memoryview) -class MemoryviewDumper(_BinaryDumper): - def dump(self, obj: memoryview) -> bytes: - return self.esc.escape_bytea(bytes(obj)) - - @Dumper.binary(bytes) @Dumper.binary(bytearray) @Dumper.binary(memoryview) @@ -124,6 +114,7 @@ class BytesBinaryDumper(Dumper): def dump( self, obj: Union[bytes, bytearray, memoryview] ) -> Union[bytes, bytearray, memoryview]: + # TODO: mypy doesn't complain, but this function has the wrong signature return obj diff --git a/psycopg3_c/psycopg3_c/adapt.pyx b/psycopg3_c/psycopg3_c/adapt.pyx index fb18831e6..761ec5816 100644 --- a/psycopg3_c/psycopg3_c/adapt.pyx +++ b/psycopg3_c/psycopg3_c/adapt.pyx @@ -31,11 +31,15 @@ cdef class CDumper: cdef object _src cdef object _context cdef object _connection + cdef PGconn _pgconn def __init__(self, src: type, context: AdaptContext = None): self._src = src self._context = context self._connection = _connection_from_context(context) + self._pgconn = ( + self._connection.pgconn if self._connection is not None else None + ) @property def src(self) -> type: @@ -54,13 +58,20 @@ cdef class CDumper: def quote(self, obj: Any) -> bytes: # TODO: can be optimized - cdef bytes value = self.dump(obj) + cdef object ovalue = self.dump(obj) + + cdef bytes value + if isinstance(ovalue, bytes): + value = ovalue + else: + value = bytes(ovalue) + cdef bytes tmp cdef Escaping esc if self.connection: - esc = Escaping(self.connection.pgconn) - return esc.escape_literal(value) + esc = Escaping(self._pgconn) + return bytes(esc.escape_literal(value)) else: esc = Escaping() @@ -138,8 +149,8 @@ cdef class CLoader: cdef _connection_from_context(object context): - from psycopg3.adapt import _connection_from_context - return _connection_from_context(context) + from psycopg3.adapt import connection_from_context + return connection_from_context(context) def register_builtin_c_adapters(): diff --git a/psycopg3_c/psycopg3_c/pq_cython.pxd b/psycopg3_c/psycopg3_c/pq_cython.pxd index 8b94ea0d8..faeb46094 100644 --- a/psycopg3_c/psycopg3_c/pq_cython.pxd +++ b/psycopg3_c/psycopg3_c/pq_cython.pxd @@ -36,3 +36,11 @@ cdef class PGcancel: cdef class Escaping: cdef PGconn conn + + +cdef class PQBuffer: + cdef unsigned char *buf + cdef Py_ssize_t len + + @staticmethod + cdef PQBuffer _from_buffer(unsigned char *buf, Py_ssize_t len) diff --git a/psycopg3_c/psycopg3_c/pq_cython.pyx b/psycopg3_c/psycopg3_c/pq_cython.pyx index f4c319720..8582e6f31 100644 --- a/psycopg3_c/psycopg3_c/pq_cython.pyx +++ b/psycopg3_c/psycopg3_c/pq_cython.pyx @@ -4,6 +4,7 @@ libpq Python wrapper using cython bindings. # Copyright (C) 2020 The Psycopg Team +from libc.string cimport strlen from posix.unistd cimport getpid from cpython.mem cimport PyMem_Malloc, PyMem_Free from cpython.bytes cimport PyBytes_AsString, PyBytes_AsStringAndSize @@ -534,7 +535,6 @@ cdef (int, Oid *, char * const*, int *, int *) _query_params_args( cdef int *alenghts = NULL cdef char *ptr cdef Py_ssize_t length - cdef Py_buffer buf if nparams: aparams = PyMem_Malloc(nparams * sizeof(char *)) @@ -544,17 +544,12 @@ cdef (int, Oid *, char * const*, int *, int *) _query_params_args( if obj is None: aparams[i] = NULL alenghts[i] = 0 - elif isinstance(obj, bytes): - PyBytes_AsStringAndSize(obj, &ptr, &length) + else: + # TODO: it is a leak if this fails (but it should only fail + # on internal error, e.g. if obj is not a buffer) + _buffer_as_string_and_size(obj, &ptr, &length) aparams[i] = ptr alenghts[i] = length - elif PyObject_CheckBuffer(obj): - PyObject_GetBuffer(obj, &buf, PyBUF_SIMPLE) - aparams[i] = buf.buf - alenghts[i] = buf.len - PyBuffer_Release(&buf) - else: - raise TypeError(f"bytes or buffer expected, got {type(obj)}") cdef Oid *atypes = NULL if param_types is not None: @@ -803,30 +798,32 @@ class Conninfo: def __repr__(self): return f"<{type(self).__name__} ({self.keyword.decode('ascii')})>" - cdef class Escaping: def __init__(self, conn: Optional[PGconn] = None): self.conn = conn - def escape_literal(self, data: bytes) -> bytes: + def escape_literal(self, data: "Buffer") -> memoryview: cdef char *out cdef bytes rv + cdef char *ptr + cdef Py_ssize_t length - if self.conn is not None: - if self.conn.pgconn_ptr is NULL: - raise PQerror("the connection is closed") - out = impl.PQescapeLiteral(self.conn.pgconn_ptr, data, len(data)) - if out is NULL: - raise PQerror( - f"escape_literal failed: {error_message(self.conn)}" - ) - rv = out - impl.PQfreemem(out) - return rv - - else: + if self.conn is None: raise PQerror("escape_literal failed: no connection provided") + if self.conn.pgconn_ptr is NULL: + raise PQerror("the connection is closed") + _buffer_as_string_and_size(data, &ptr, &length) + + out = impl.PQescapeLiteral(self.conn.pgconn_ptr, ptr, length) + if out is NULL: + raise PQerror( + f"escape_literal failed: {error_message(self.conn)}" + ) + + return memoryview(PQBuffer._from_buffer(out, strlen(out))) + + # TODO: return PQBuffer def escape_identifier(self, data: bytes) -> bytes: cdef char *out cdef bytes rv @@ -879,28 +876,33 @@ cdef class Escaping: PyMem_Free(out) return rv - - def escape_bytea(self, data: bytes) -> bytes: + def escape_bytea(self, data: "Buffer") -> memoryview: cdef size_t len_out cdef unsigned char *out + cdef char *ptr + cdef Py_ssize_t length + + if self.conn is not None and self.conn.pgconn_ptr is NULL: + raise PQerror("the connection is closed") + + _buffer_as_string_and_size(data, &ptr, &length) + if self.conn is not None: - if self.conn.pgconn_ptr is NULL: - raise PQerror("the connection is closed") out = impl.PQescapeByteaConn( - self.conn.pgconn_ptr, data, len(data), &len_out) + self.conn.pgconn_ptr, ptr, length, &len_out) else: - out = impl.PQescapeBytea(data, len(data), &len_out) + out = impl.PQescapeBytea(ptr, length, &len_out) + if out is NULL: raise MemoryError( f"couldn't allocate for escape_bytea of {len(data)} bytes" ) - # TODO: without copy? - rv = out[:len_out - 1] # out includes final 0 - impl.PQfreemem(out) - return rv + return memoryview( + PQBuffer._from_buffer(out, len_out - 1) # out includes final 0 + ) - def unescape_bytea(self, data: bytes) -> bytes: + def unescape_bytea(self, data: bytes) -> memoryview: # not needed, but let's keep it symmetric with the escaping: # if a connection is passed in, it must be valid. if self.conn is not None: @@ -914,6 +916,60 @@ cdef class Escaping: f"couldn't allocate for unescape_bytea of {len(data)} bytes" ) - rv = out[:len_out] - impl.PQfreemem(out) + return memoryview(PQBuffer._from_buffer(out, len_out)) + + +cdef class PQBuffer: + """ + Wrap a chunk of memory allocated by the libpq and expose it as memoryview. + """ + @staticmethod + cdef PQBuffer _from_buffer(unsigned char *buf, Py_ssize_t len): + cdef PQBuffer rv = PQBuffer.__new__(PQBuffer) + rv.buf = buf + rv.len = len return rv + + def __cinit__(self): + self.buf = NULL + self.len = 0 + + def __dealloc__(self): + if self.buf: + impl.PQfreemem(self.buf) + + def __repr__(self): + return ( + f"{self.__class__.__module__}.{self.__class__.__qualname__}" + f"({bytes(self)})" + ) + + def __getbuffer__(self, Py_buffer *buffer, int flags): + buffer.buf = self.buf + buffer.obj = self + buffer.len = self.len + buffer.itemsize = sizeof(unsigned char) + buffer.readonly = 1 + buffer.ndim = 1 + buffer.format = NULL # unsigned char + buffer.shape = &self.len + buffer.strides = NULL + buffer.suboffsets = NULL + buffer.internal = NULL + + def __releasebuffer__(self, Py_buffer *buffer): + pass + + +cdef int _buffer_as_string_and_size(data: "Buffer", char **ptr, Py_ssize_t *length) except -1: + cdef Py_buffer buf + + if isinstance(data, bytes): + PyBytes_AsStringAndSize(data, ptr, length) + elif PyObject_CheckBuffer(data): + PyObject_GetBuffer(data, &buf, PyBUF_SIMPLE) + ptr[0] = buf.buf + length[0] = buf.len + PyBuffer_Release(&buf) + else: + raise TypeError(f"bytes or buffer expected, got {type(data)}") diff --git a/psycopg3_c/psycopg3_c/types/text.pyx b/psycopg3_c/psycopg3_c/types/text.pyx index 2bcd6e463..cf3b73a81 100644 --- a/psycopg3_c/psycopg3_c/types/text.pyx +++ b/psycopg3_c/psycopg3_c/types/text.pyx @@ -9,6 +9,7 @@ from cpython.unicode cimport PyUnicode_Decode, PyUnicode_DecodeUTF8 from cpython.unicode cimport PyUnicode_AsUTF8String, PyUnicode_AsEncodedString from psycopg3_c cimport libpq, oids +from psycopg3_c.pq_cython cimport Escaping cdef class _StringDumper(CDumper): @@ -94,6 +95,26 @@ cdef class TextLoader(CLoader): return data[:length] +cdef class BytesDumper(CDumper): + cdef Escaping esc + + def __init__(self, src: type, context: AdaptContext): + super().__init__(src, context) + self.esc = Escaping(self._pgconn) + + @property + def oid(self) -> int: + return oids.BYTEA_OID + + def dump(self, obj) -> memoryview: + return self.esc.escape_bytea(obj) + + +cdef class BytesBinaryDumper(BytesDumper): + def dump(self, obj): + return obj + + cdef class ByteaLoader(CLoader): cdef object cload(self, const char *data, size_t length): cdef size_t len_out @@ -126,5 +147,12 @@ cdef void register_text_c_adapters(): TextLoader.register(oids.VARCHAR_OID) TextLoader.register(oids.VARCHAR_OID, format=Format.BINARY) + BytesDumper.register(bytes) + BytesDumper.register(bytearray) + BytesDumper.register(memoryview) + BytesBinaryDumper.register(bytes, format=Format.BINARY) + BytesBinaryDumper.register(bytearray, format=Format.BINARY) + BytesBinaryDumper.register(memoryview, format=Format.BINARY) + ByteaLoader.register(oids.BYTEA_OID) ByteaBinaryLoader.register(oids.BYTEA_OID, format=Format.BINARY)