From: Daniele Varrazzo Date: Wed, 6 Jan 2021 23:49:05 +0000 (+0100) Subject: Accumulate lines into a buffer to send copy data in larger chunks X-Git-Tag: 3.0.dev0~203 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=540a45f81234e079ed0fa1f7e7e57ed53b4677f4;p=thirdparty%2Fpsycopg.git Accumulate lines into a buffer to send copy data in larger chunks --- diff --git a/psycopg3/psycopg3/copy.py b/psycopg3/psycopg3/copy.py index 0fbc10b35..938bd19bd 100644 --- a/psycopg3/psycopg3/copy.py +++ b/psycopg3/psycopg3/copy.py @@ -6,9 +6,10 @@ psycopg3 copy support import re import struct -from typing import TYPE_CHECKING, AsyncIterator, Callable, Iterator, Generic -from typing import Any, Dict, List, Match, Optional, Sequence, Type, Union from types import TracebackType +from typing import TYPE_CHECKING, AsyncIterator, Iterator, Generic +from typing import Any, Dict, Match, Optional, Sequence, Type, Union +from typing_extensions import Protocol from . import pq from .pq import Format, ExecStatus @@ -20,7 +21,17 @@ if TYPE_CHECKING: from .cursor import BaseCursor # noqa: F401 from .connection import Connection, AsyncConnection # noqa: F401 -FormatFunc = Callable[[Sequence[Any], Transformer], Union[bytes, bytearray]] + +class FormatFunc(Protocol): + """The type of a function to format copy data to a bytearray.""" + + def __call__( + self, + row: Sequence[Any], + tx: Transformer, + out: Optional[bytearray] = None, + ) -> bytearray: + ... class BaseCopy(Generic[ConnectionType]): @@ -39,6 +50,8 @@ class BaseCopy(Generic[ConnectionType]): self._encoding = self.connection.client_encoding self._signature_sent = False self._row_mode = False # true if the user is using send_row() + self._write_buffer = bytearray() + self._write_buffer_size = 32 * 1024 self._finished = False self._format_row: FormatFunc @@ -79,20 +92,25 @@ class BaseCopy(Generic[ConnectionType]): # to take care of the end-of-copy marker too self._row_mode = True - data = self._format_row(row, self.transformer) if self.format == Format.BINARY and not self._signature_sent: - yield from copy_to(self._pgconn, _binary_signature) + self._write_buffer += _binary_signature self._signature_sent = True - yield from copy_to(self._pgconn, data) + self._format_row(row, self.transformer, self._write_buffer) + if len(self._write_buffer) > self._write_buffer_size: + yield from copy_to(self._pgconn, self._write_buffer) + self._write_buffer.clear() def _finish_gen(self, error: str = "") -> PQGen[None]: - berr = ( - error.encode(self.connection.client_encoding, "replace") - if error - else None - ) - res = yield from copy_end(self._pgconn, berr) + if error: + berr = error.encode(self.connection.client_encoding, "replace") + res = yield from copy_end(self._pgconn, berr) + else: + if self._write_buffer: + yield from copy_to(self._pgconn, self._write_buffer) + self._write_buffer.clear() + res = yield from copy_end(self._pgconn, None) + nrows = res.command_tuples self.cursor._rowcount = nrows if nrows is not None else -1 self._finished = True @@ -117,8 +135,8 @@ class BaseCopy(Generic[ConnectionType]): # If we have sent no data we need to send the signature # and the trailer if not self._signature_sent: - yield from copy_to(self._pgconn, _binary_signature) - yield from copy_to(self._pgconn, _binary_trailer) + self._write_buffer += _binary_signature + self._write_buffer += _binary_trailer elif self._row_mode: # if we have sent data already, we have sent the signature too # (either with the first row, or we assume that in block mode @@ -126,7 +144,7 @@ class BaseCopy(Generic[ConnectionType]): # Write the trailer only if we are sending rows (with the # assumption that who is copying binary data is sending the # whole format). - yield from copy_to(self._pgconn, _binary_trailer) + self._write_buffer += _binary_trailer yield from self._finish_gen() @@ -236,41 +254,48 @@ class AsyncCopy(BaseCopy["AsyncConnection"]): yield data -def format_row_text(row: Sequence[Any], tx: "Transformer") -> bytes: - """Convert a row of objects to the data to send for copy""" +def format_row_text( + row: Sequence[Any], tx: Transformer, out: Optional[bytearray] = None +) -> bytearray: + """Convert a row of objects to the data to send for copy.""" + if out is None: + out = bytearray() + if not row: - return b"\n" + out += b"\n" + return out - out: List[bytes] = [] for item in row: if item is not None: dumper = tx.get_dumper(item, Format.TEXT) b = dumper.dump(item) - out.append(_bsrepl_re.sub(_bsrepl_sub, b)) + out += _bsrepl_re.sub(_bsrepl_sub, b) else: - out.append(br"\N") + out += br"\N" + out += b"\t" - out[-1] += b"\n" - return b"\t".join(out) + out[-1:] = b"\n" + return out -def _format_row_binary(row: Sequence[Any], tx: "Transformer") -> bytes: - """Convert a row of objects to the data to send for binary copy""" - if not row: - return b"\x00\x00" # zero columns +def _format_row_binary( + row: Sequence[Any], tx: Transformer, out: Optional[bytearray] = None +) -> bytearray: + """Convert a row of objects to the data to send for binary copy.""" + if out is None: + out = bytearray() - out = [] - out.append(_pack_int2(len(row))) + out += _pack_int2(len(row)) for item in row: if item is not None: dumper = tx.get_dumper(item, Format.BINARY) b = dumper.dump(item) - out.append(_pack_int4(len(b))) - out.append(b) + out += _pack_int4(len(b)) + out += b else: - out.append(_binary_null) + out += _binary_null - return b"".join(out) + return out _pack_int2 = struct.Struct("!h").pack diff --git a/psycopg3/psycopg3/pq/pq_ctypes.py b/psycopg3/psycopg3/pq/pq_ctypes.py index e09c7e5be..138287044 100644 --- a/psycopg3/psycopg3/pq/pq_ctypes.py +++ b/psycopg3/psycopg3/pq/pq_ctypes.py @@ -521,6 +521,9 @@ class PGconn: return None def put_copy_data(self, buffer: bytes) -> int: + # TODO: should be done without copy + if not isinstance(buffer, bytes): + buffer = bytes(buffer) rv = impl.PQputCopyData(self.pgconn_ptr, buffer, len(buffer)) if rv < 0: raise PQerror(f"sending copy data failed: {error_message(self)}") diff --git a/psycopg3_c/psycopg3_c/_psycopg3.pyi b/psycopg3_c/psycopg3_c/_psycopg3.pyi index 128f2e9ff..1352ae480 100644 --- a/psycopg3_c/psycopg3_c/_psycopg3.pyi +++ b/psycopg3_c/psycopg3_c/_psycopg3.pyi @@ -38,6 +38,8 @@ class Transformer(proto.AdaptContext): def connect(conninfo: str) -> proto.PQGenConn[PGconn]: ... def execute(pgconn: PGconn) -> proto.PQGen[List[PGresult]]: ... -def format_row_binary(row: Sequence[Any], tx: proto.Transformer) -> bytes: ... +def format_row_binary( + row: Sequence[Any], tx: proto.Transformer, out: Optional[bytearray] = None +) -> bytearray: ... # vim: set syntax=python: diff --git a/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx b/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx index ce97c7d39..a29fa96e3 100644 --- a/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx +++ b/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx @@ -8,33 +8,40 @@ C optimised functions for the copy system. from libc.string cimport memcpy from libc.stdint cimport uint16_t, uint32_t, int32_t from cpython.bytearray cimport PyByteArray_FromStringAndSize, PyByteArray_Resize +from cpython.bytearray cimport PyByteArray_AS_STRING, PyByteArray_GET_SIZE + from psycopg3_c._psycopg3.endian cimport htobe16, htobe32 cdef int32_t _binary_null = -1 -def format_row_binary(row: Sequence[Any], tx: Transformer) -> bytearray: +def format_row_binary( + row: Sequence[Any], tx: Transformer, out: bytearray = None +) -> bytearray: """Convert a row of adapted data to the data to send for binary copy""" - cdef bytearray out = PyByteArray_FromStringAndSize("", 0) - cdef Py_ssize_t pos # position in out where we write - cdef Py_ssize_t length - cdef uint16_t rowlen - cdef uint32_t hlength - cdef char *buf - cdef char *target - cdef int i - cdef CDumper cdumper + cdef Py_ssize_t rowlen = len(row) + cdef uint16_t berowlen = htobe16(rowlen) - rowlen = len(row) + cdef Py_ssize_t pos # offset in 'out' where to write + if out is None: + out = PyByteArray_FromStringAndSize("", 0) + pos = 0 + else: + pos = PyByteArray_GET_SIZE(out) # let's start from a nice chunk - # (larger than most fixed size, for variable ones, oh well, we will resize) - PyByteArray_Resize(out, sizeof(rowlen) + 20 * rowlen) + # (larger than most fixed size; for variable ones, oh well, we'll resize it) + cdef char *target = CDumper.ensure_size( + out, pos, sizeof(berowlen) + 20 * rowlen) # Write the number of fields as network-order 16 bits - buf = PyByteArray_AS_STRING(out) - (buf)[0] = htobe16(rowlen) # this is aligned - pos = sizeof(rowlen) + memcpy(target, &berowlen, sizeof(berowlen)) + pos += sizeof(berowlen) + + cdef Py_ssize_t size + cdef uint32_t besize + cdef char *buf + cdef int i for i in range(rowlen): item = row[i] @@ -42,22 +49,21 @@ def format_row_binary(row: Sequence[Any], tx: Transformer) -> bytearray: dumper = tx.get_dumper(item, FORMAT_BINARY) if isinstance(dumper, CDumper): # A cdumper can resize if necessary and copy in place - cdumper = dumper - length = cdumper.cdump(item, out, pos + sizeof(hlength)) - # Also add the length of the item, before the item - hlength = htobe32(length) + size = (dumper).cdump(item, out, pos + sizeof(besize)) + # Also add the size of the item, before the item + besize = htobe32(size) target = PyByteArray_AS_STRING(out) # might have been moved by cdump - memcpy(target + pos, &hlength, sizeof(hlength)) + memcpy(target + pos, &besize, sizeof(besize)) else: # A Python dumper, gotta call it and extract its juices b = dumper.dump(item) - _buffer_as_string_and_size(b, &buf, &length) - target = CDumper.ensure_size(out, pos, length + sizeof(hlength)) - hlength = htobe32(length) - memcpy(target, &hlength, sizeof(hlength)) - memcpy(target + sizeof(hlength), buf, length) + _buffer_as_string_and_size(b, &buf, &size) + target = CDumper.ensure_size(out, pos, size + sizeof(besize)) + besize = htobe32(size) + memcpy(target, &besize, sizeof(besize)) + memcpy(target + sizeof(besize), buf, size) - pos += length + sizeof(hlength) + pos += size + sizeof(besize) else: target = CDumper.ensure_size(out, pos, sizeof(_binary_null))