From: Daniele Varrazzo Date: Fri, 1 Jan 2021 17:51:32 +0000 (+0100) Subject: Added C helper for binary copy formatting X-Git-Tag: 3.0.dev0~209 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3200acfb2027dfd740b86fb286b5b6b928ea5898;p=thirdparty%2Fpsycopg.git Added C helper for binary copy formatting --- diff --git a/psycopg3/psycopg3/copy.py b/psycopg3/psycopg3/copy.py index 999594471..0fbc10b35 100644 --- a/psycopg3/psycopg3/copy.py +++ b/psycopg3/psycopg3/copy.py @@ -6,13 +6,13 @@ psycopg3 copy support import re import struct -from typing import TYPE_CHECKING, AsyncIterator, Iterator, Generic +from typing import TYPE_CHECKING, AsyncIterator, Callable, Iterator, Generic from typing import Any, Dict, List, Match, Optional, Sequence, Type, Union from types import TracebackType from . import pq from .pq import Format, ExecStatus -from .proto import ConnectionType, PQGen +from .proto import ConnectionType, PQGen, Transformer from .generators import copy_from, copy_to, copy_end if TYPE_CHECKING: @@ -20,6 +20,8 @@ if TYPE_CHECKING: from .cursor import BaseCursor # noqa: F401 from .connection import Connection, AsyncConnection # noqa: F401 +FormatFunc = Callable[[Sequence[Any], Transformer], Union[bytes, bytearray]] + class BaseCopy(Generic[ConnectionType]): def __init__(self, cursor: "BaseCursor[ConnectionType]"): @@ -39,10 +41,11 @@ class BaseCopy(Generic[ConnectionType]): self._row_mode = False # true if the user is using send_row() self._finished = False + self._format_row: FormatFunc if self.format == Format.TEXT: - self._format_copy_row = format_row_text + self._format_row = format_row_text else: - self._format_copy_row = format_row_binary + self._format_row = format_row_binary def __repr__(self) -> str: cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}" @@ -76,7 +79,7 @@ class BaseCopy(Generic[ConnectionType]): # to take care of the end-of-copy marker too self._row_mode = True - data = self._format_row(row) + data = self._format_row(row, self.transformer) if self.format == Format.BINARY and not self._signature_sent: yield from copy_to(self._pgconn, _binary_signature) self._signature_sent = True @@ -129,18 +132,6 @@ class BaseCopy(Generic[ConnectionType]): # Support methods - def _format_row(self, row: Sequence[Any]) -> bytes: - """Convert a Python sequence to the data to send for copy""" - out: List[Optional[bytes]] = [] - for item in row: - if item is not None: - dumper = self.transformer.get_dumper(item, self.format) - out.append(dumper.dump(item)) - else: - out.append(None) - - return self._format_copy_row(out) - def _ensure_bytes(self, data: Union[bytes, str]) -> bytes: if isinstance(data, bytes): return data @@ -245,35 +236,46 @@ class AsyncCopy(BaseCopy["AsyncConnection"]): yield data -def format_row_text(row: Sequence[Optional[bytes]]) -> bytes: - """Convert a row of adapted data to the data to send for text copy""" - return ( - b"\t".join( - _bsrepl_re.sub(_bsrepl_sub, item) if item is not None else br"\N" - for item in row - ) - + b"\n" - ) +def format_row_text(row: Sequence[Any], tx: "Transformer") -> bytes: + """Convert a row of objects to the data to send for copy""" + if not row: + return b"\n" + out: List[bytes] = [] + for item in row: + if item is not None: + dumper = tx.get_dumper(item, Format.TEXT) + b = dumper.dump(item) + out.append(_bsrepl_re.sub(_bsrepl_sub, b)) + else: + out.append(br"\N") + + out[-1] += b"\n" + return b"\t".join(out) + + +def _format_row_binary(row: Sequence[Any], tx: "Transformer") -> bytes: + """Convert a row of objects to the data to send for binary copy""" + if not row: + return b"\x00\x00" # zero columns -def format_row_binary( - row: Sequence[Optional[bytes]], - __int2_struct: struct.Struct = struct.Struct("!h"), - __int4_struct: struct.Struct = struct.Struct("!i"), -) -> bytes: - """Convert a row of adapted data to the data to send for binary copy""" out = [] - out.append(__int2_struct.pack(len(row))) + out.append(_pack_int2(len(row))) for item in row: if item is not None: - out.append(__int4_struct.pack(len(item))) - out.append(item) + dumper = tx.get_dumper(item, Format.BINARY) + b = dumper.dump(item) + out.append(_pack_int4(len(b))) + out.append(b) else: out.append(_binary_null) return b"".join(out) +_pack_int2 = struct.Struct("!h").pack +_pack_int4 = struct.Struct("!i").pack + _binary_signature = ( # Signature, flags, extra length b"PGCOPY\n\xff\r\n\0" @@ -300,3 +302,16 @@ def _bsrepl_sub( _bsrepl_re = re.compile(b"[\b\t\n\v\f\r\\\\]") + + +# Override it with fast object if available + +format_row_binary: FormatFunc + +if pq.__impl__ == "c": + from psycopg3_c import _psycopg3 + + format_row_binary = _psycopg3.format_row_binary + +else: + format_row_binary = _format_row_binary diff --git a/psycopg3_c/psycopg3_c/_psycopg3.pyi b/psycopg3_c/psycopg3_c/_psycopg3.pyi index d7f0840d7..0a229001c 100644 --- a/psycopg3_c/psycopg3_c/_psycopg3.pyi +++ b/psycopg3_c/psycopg3_c/_psycopg3.pyi @@ -9,14 +9,14 @@ information. Will submit a bug. from typing import Any, Iterable, List, Optional, Sequence, Tuple +from psycopg3 import proto from psycopg3.adapt import Dumper, Loader, AdaptersMap -from psycopg3.proto import AdaptContext, PQGen, PQGenConn from psycopg3.connection import BaseConnection from psycopg3.pq import Format from psycopg3.pq.proto import PGconn, PGresult -class Transformer(AdaptContext): - def __init__(self, context: Optional[AdaptContext] = None): ... +class Transformer(proto.AdaptContext): + def __init__(self, context: Optional[proto.AdaptContext] = None): ... @property def connection(self) -> Optional[BaseConnection]: ... @property @@ -37,7 +37,8 @@ class Transformer(AdaptContext): def get_loader(self, oid: int, format: Format) -> Loader: ... def register_builtin_c_adapters() -> None: ... -def connect(conninfo: str) -> PQGenConn[PGconn]: ... -def execute(pgconn: PGconn) -> PQGen[List[PGresult]]: ... +def connect(conninfo: str) -> proto.PQGenConn[PGconn]: ... +def execute(pgconn: PGconn) -> proto.PQGen[List[PGresult]]: ... +def format_row_binary(row: Sequence[Any], tx: proto.Transformer) -> bytes: ... # vim: set syntax=python: diff --git a/psycopg3_c/psycopg3_c/_psycopg3.pyx b/psycopg3_c/psycopg3_c/_psycopg3.pyx index 4e32e9db5..77e220751 100644 --- a/psycopg3_c/psycopg3_c/_psycopg3.pyx +++ b/psycopg3_c/psycopg3_c/_psycopg3.pyx @@ -11,7 +11,14 @@ from psycopg3_c cimport pq from psycopg3_c.pq cimport libpq from psycopg3_c._psycopg3 cimport oids +from psycopg3.pq import Format + +FORMAT_TEXT = Format.TEXT +FORMAT_BINARY = Format.BINARY + + include "_psycopg3/adapt.pyx" +include "_psycopg3/copy.pyx" include "_psycopg3/generators.pyx" include "_psycopg3/transform.pyx" diff --git a/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx b/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx new file mode 100644 index 000000000..ce97c7d39 --- /dev/null +++ b/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx @@ -0,0 +1,69 @@ +""" +C optimised functions for the copy system. + +""" + +# Copyright (C) 2020 The Psycopg Team + +from libc.string cimport memcpy +from libc.stdint cimport uint16_t, uint32_t, int32_t +from cpython.bytearray cimport PyByteArray_FromStringAndSize, PyByteArray_Resize +from psycopg3_c._psycopg3.endian cimport htobe16, htobe32 + +cdef int32_t _binary_null = -1 + + +def format_row_binary(row: Sequence[Any], tx: Transformer) -> bytearray: + """Convert a row of adapted data to the data to send for binary copy""" + cdef bytearray out = PyByteArray_FromStringAndSize("", 0) + cdef Py_ssize_t pos # position in out where we write + cdef Py_ssize_t length + cdef uint16_t rowlen + cdef uint32_t hlength + cdef char *buf + cdef char *target + cdef int i + cdef CDumper cdumper + + rowlen = len(row) + + # let's start from a nice chunk + # (larger than most fixed size, for variable ones, oh well, we will resize) + PyByteArray_Resize(out, sizeof(rowlen) + 20 * rowlen) + + # Write the number of fields as network-order 16 bits + buf = PyByteArray_AS_STRING(out) + (buf)[0] = htobe16(rowlen) # this is aligned + pos = sizeof(rowlen) + + for i in range(rowlen): + item = row[i] + if item is not None: + dumper = tx.get_dumper(item, FORMAT_BINARY) + if isinstance(dumper, CDumper): + # A cdumper can resize if necessary and copy in place + cdumper = dumper + length = cdumper.cdump(item, out, pos + sizeof(hlength)) + # Also add the length of the item, before the item + hlength = htobe32(length) + target = PyByteArray_AS_STRING(out) # might have been moved by cdump + memcpy(target + pos, &hlength, sizeof(hlength)) + else: + # A Python dumper, gotta call it and extract its juices + b = dumper.dump(item) + _buffer_as_string_and_size(b, &buf, &length) + target = CDumper.ensure_size(out, pos, length + sizeof(hlength)) + hlength = htobe32(length) + memcpy(target, &hlength, sizeof(hlength)) + memcpy(target + sizeof(hlength), buf, length) + + pos += length + sizeof(hlength) + + else: + target = CDumper.ensure_size(out, pos, sizeof(_binary_null)) + memcpy(target, &_binary_null, sizeof(_binary_null)) + pos += sizeof(_binary_null) + + # Resize to the final size + PyByteArray_Resize(out, pos) + return out diff --git a/psycopg3_c/psycopg3_c/types/endian.pxd b/psycopg3_c/psycopg3_c/_psycopg3/endian.pxd similarity index 100% rename from psycopg3_c/psycopg3_c/types/endian.pxd rename to psycopg3_c/psycopg3_c/_psycopg3/endian.pxd diff --git a/psycopg3_c/psycopg3_c/_psycopg3/transform.pyx b/psycopg3_c/psycopg3_c/_psycopg3/transform.pyx index 8ac9fa4b7..bad7c0824 100644 --- a/psycopg3_c/psycopg3_c/_psycopg3/transform.pyx +++ b/psycopg3_c/psycopg3_c/_psycopg3/transform.pyx @@ -17,7 +17,6 @@ from cpython.object cimport PyObject, PyObject_CallFunctionObjArgs from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple from psycopg3 import errors as e -from psycopg3.pq import Format # internal structure: you are not supposed to know this. But it's worth some diff --git a/psycopg3_c/psycopg3_c/pq/pgconn.pyx b/psycopg3_c/psycopg3_c/pq/pgconn.pyx index 0e250d487..d2560259c 100644 --- a/psycopg3_c/psycopg3_c/pq/pgconn.pyx +++ b/psycopg3_c/psycopg3_c/pq/pgconn.pyx @@ -424,12 +424,12 @@ cdef class PGconn: else: return None - def put_copy_data(self, buffer: bytes) -> int: + def put_copy_data(self, buffer) -> int: cdef int rv cdef char *cbuffer cdef Py_ssize_t length - PyBytes_AsStringAndSize(buffer, &cbuffer, &length) + _buffer_as_string_and_size(buffer, &cbuffer, &length) rv = libpq.PQputCopyData(self.pgconn_ptr, cbuffer, length) if rv < 0: raise PQerror(f"sending copy data failed: {error_message(self)}") diff --git a/psycopg3_c/psycopg3_c/types/numeric.pyx b/psycopg3_c/psycopg3_c/types/numeric.pyx index 0bcd3c4fc..928920bf8 100644 --- a/psycopg3_c/psycopg3_c/types/numeric.pyx +++ b/psycopg3_c/psycopg3_c/types/numeric.pyx @@ -10,7 +10,7 @@ from cpython.long cimport PyLong_FromString, PyLong_FromLong, PyLong_AsLongLong from cpython.long cimport PyLong_FromLongLong, PyLong_FromUnsignedLong from cpython.float cimport PyFloat_FromDouble -from endian cimport be16toh, be32toh, be64toh, htobe64 +from psycopg3_c._psycopg3.endian cimport be16toh, be32toh, be64toh, htobe64 cdef extern from "Python.h": # work around https://github.com/cython/cython/issues/3909