]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Accumulate lines into a buffer to send copy data in larger chunks
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 6 Jan 2021 23:49:05 +0000 (00:49 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 8 Jan 2021 01:32:29 +0000 (02:32 +0100)
psycopg3/psycopg3/copy.py
psycopg3/psycopg3/pq/pq_ctypes.py
psycopg3_c/psycopg3_c/_psycopg3.pyi
psycopg3_c/psycopg3_c/_psycopg3/copy.pyx

index 0fbc10b358b1594c1dec566208b36d34ddc76e49..938bd19bd87448cd511816092a6ffb49b7d4f6df 100644 (file)
@@ -6,9 +6,10 @@ psycopg3 copy support
 
 import re
 import struct
-from typing import TYPE_CHECKING, AsyncIterator, Callable, Iterator, Generic
-from typing import Any, Dict, List, Match, Optional, Sequence, Type, Union
 from types import TracebackType
+from typing import TYPE_CHECKING, AsyncIterator, Iterator, Generic
+from typing import Any, Dict, Match, Optional, Sequence, Type, Union
+from typing_extensions import Protocol
 
 from . import pq
 from .pq import Format, ExecStatus
@@ -20,7 +21,17 @@ if TYPE_CHECKING:
     from .cursor import BaseCursor  # noqa: F401
     from .connection import Connection, AsyncConnection  # noqa: F401
 
-FormatFunc = Callable[[Sequence[Any], Transformer], Union[bytes, bytearray]]
+
+class FormatFunc(Protocol):
+    """The type of a function to format copy data to a bytearray."""
+
+    def __call__(
+        self,
+        row: Sequence[Any],
+        tx: Transformer,
+        out: Optional[bytearray] = None,
+    ) -> bytearray:
+        ...
 
 
 class BaseCopy(Generic[ConnectionType]):
@@ -39,6 +50,8 @@ class BaseCopy(Generic[ConnectionType]):
         self._encoding = self.connection.client_encoding
         self._signature_sent = False
         self._row_mode = False  # true if the user is using send_row()
+        self._write_buffer = bytearray()
+        self._write_buffer_size = 32 * 1024
         self._finished = False
 
         self._format_row: FormatFunc
@@ -79,20 +92,25 @@ class BaseCopy(Generic[ConnectionType]):
         # to take care of the end-of-copy marker too
         self._row_mode = True
 
-        data = self._format_row(row, self.transformer)
         if self.format == Format.BINARY and not self._signature_sent:
-            yield from copy_to(self._pgconn, _binary_signature)
+            self._write_buffer += _binary_signature
             self._signature_sent = True
 
-        yield from copy_to(self._pgconn, data)
+        self._format_row(row, self.transformer, self._write_buffer)
+        if len(self._write_buffer) > self._write_buffer_size:
+            yield from copy_to(self._pgconn, self._write_buffer)
+            self._write_buffer.clear()
 
     def _finish_gen(self, error: str = "") -> PQGen[None]:
-        berr = (
-            error.encode(self.connection.client_encoding, "replace")
-            if error
-            else None
-        )
-        res = yield from copy_end(self._pgconn, berr)
+        if error:
+            berr = error.encode(self.connection.client_encoding, "replace")
+            res = yield from copy_end(self._pgconn, berr)
+        else:
+            if self._write_buffer:
+                yield from copy_to(self._pgconn, self._write_buffer)
+                self._write_buffer.clear()
+            res = yield from copy_end(self._pgconn, None)
+
         nrows = res.command_tuples
         self.cursor._rowcount = nrows if nrows is not None else -1
         self._finished = True
@@ -117,8 +135,8 @@ class BaseCopy(Generic[ConnectionType]):
             # If we have sent no data we need to send the signature
             # and the trailer
             if not self._signature_sent:
-                yield from copy_to(self._pgconn, _binary_signature)
-                yield from copy_to(self._pgconn, _binary_trailer)
+                self._write_buffer += _binary_signature
+                self._write_buffer += _binary_trailer
             elif self._row_mode:
                 # if we have sent data already, we have sent the signature too
                 # (either with the first row, or we assume that in block mode
@@ -126,7 +144,7 @@ class BaseCopy(Generic[ConnectionType]):
                 # Write the trailer only if we are sending rows (with the
                 # assumption that who is copying binary data is sending the
                 # whole format).
-                yield from copy_to(self._pgconn, _binary_trailer)
+                self._write_buffer += _binary_trailer
 
         yield from self._finish_gen()
 
@@ -236,41 +254,48 @@ class AsyncCopy(BaseCopy["AsyncConnection"]):
             yield data
 
 
-def format_row_text(row: Sequence[Any], tx: "Transformer") -> bytes:
-    """Convert a row of objects to the data to send for copy"""
+def format_row_text(
+    row: Sequence[Any], tx: Transformer, out: Optional[bytearray] = None
+) -> bytearray:
+    """Convert a row of objects to the data to send for copy."""
+    if out is None:
+        out = bytearray()
+
     if not row:
-        return b"\n"
+        out += b"\n"
+        return out
 
-    out: List[bytes] = []
     for item in row:
         if item is not None:
             dumper = tx.get_dumper(item, Format.TEXT)
             b = dumper.dump(item)
-            out.append(_bsrepl_re.sub(_bsrepl_sub, b))
+            out += _bsrepl_re.sub(_bsrepl_sub, b)
         else:
-            out.append(br"\N")
+            out += br"\N"
+        out += b"\t"
 
-    out[-1] += b"\n"
-    return b"\t".join(out)
+    out[-1:] = b"\n"
+    return out
 
 
-def _format_row_binary(row: Sequence[Any], tx: "Transformer") -> bytes:
-    """Convert a row of objects to the data to send for binary copy"""
-    if not row:
-        return b"\x00\x00"  # zero columns
+def _format_row_binary(
+    row: Sequence[Any], tx: Transformer, out: Optional[bytearray] = None
+) -> bytearray:
+    """Convert a row of objects to the data to send for binary copy."""
+    if out is None:
+        out = bytearray()
 
-    out = []
-    out.append(_pack_int2(len(row)))
+    out += _pack_int2(len(row))
     for item in row:
         if item is not None:
             dumper = tx.get_dumper(item, Format.BINARY)
             b = dumper.dump(item)
-            out.append(_pack_int4(len(b)))
-            out.append(b)
+            out += _pack_int4(len(b))
+            out += b
         else:
-            out.append(_binary_null)
+            out += _binary_null
 
-    return b"".join(out)
+    return out
 
 
 _pack_int2 = struct.Struct("!h").pack
index e09c7e5be05ae1a44c11fd6271bd2fec32dbd1b5..13828704481ac0052c66ec014a3885439e29a602 100644 (file)
@@ -521,6 +521,9 @@ class PGconn:
             return None
 
     def put_copy_data(self, buffer: bytes) -> int:
+        # TODO: should be done without copy
+        if not isinstance(buffer, bytes):
+            buffer = bytes(buffer)
         rv = impl.PQputCopyData(self.pgconn_ptr, buffer, len(buffer))
         if rv < 0:
             raise PQerror(f"sending copy data failed: {error_message(self)}")
index 128f2e9ff28ace63f0155dc723cac641c476b287..1352ae4801f6e1d80873c1c7d4675ee1df207dc7 100644 (file)
@@ -38,6 +38,8 @@ class Transformer(proto.AdaptContext):
 
 def connect(conninfo: str) -> proto.PQGenConn[PGconn]: ...
 def execute(pgconn: PGconn) -> proto.PQGen[List[PGresult]]: ...
-def format_row_binary(row: Sequence[Any], tx: proto.Transformer) -> bytes: ...
+def format_row_binary(
+    row: Sequence[Any], tx: proto.Transformer, out: Optional[bytearray] = None
+) -> bytearray: ...
 
 # vim: set syntax=python:
index ce97c7d39a84c455dd357e533352ff64d9d4ab95..a29fa96e384728c02306e34f98f568daeeffcbe4 100644 (file)
@@ -8,33 +8,40 @@ C optimised functions for the copy system.
 from libc.string cimport memcpy
 from libc.stdint cimport uint16_t, uint32_t, int32_t
 from cpython.bytearray cimport PyByteArray_FromStringAndSize, PyByteArray_Resize
+from cpython.bytearray cimport PyByteArray_AS_STRING, PyByteArray_GET_SIZE
+
 from psycopg3_c._psycopg3.endian cimport htobe16, htobe32
 
 cdef int32_t _binary_null = -1
 
 
-def format_row_binary(row: Sequence[Any], tx: Transformer) -> bytearray:
+def format_row_binary(
+    row: Sequence[Any], tx: Transformer, out: bytearray = None
+) -> bytearray:
     """Convert a row of adapted data to the data to send for binary copy"""
-    cdef bytearray out = PyByteArray_FromStringAndSize("", 0)
-    cdef Py_ssize_t pos  # position in out where we write
-    cdef Py_ssize_t length
-    cdef uint16_t rowlen
-    cdef uint32_t hlength
-    cdef char *buf
-    cdef char *target
-    cdef int i
-    cdef CDumper cdumper
+    cdef Py_ssize_t rowlen = len(row)
+    cdef uint16_t berowlen = htobe16(rowlen)
 
-    rowlen = len(row)
+    cdef Py_ssize_t pos  # offset in 'out' where to write
+    if out is None:
+        out = PyByteArray_FromStringAndSize("", 0)
+        pos = 0
+    else:
+        pos = PyByteArray_GET_SIZE(out)
 
     # let's start from a nice chunk
-    # (larger than most fixed size, for variable ones, oh well, we will resize)
-    PyByteArray_Resize(out, sizeof(rowlen) + 20 * rowlen)
+    # (larger than most fixed size; for variable ones, oh well, we'll resize it)
+    cdef char *target = CDumper.ensure_size(
+        out, pos, sizeof(berowlen) + 20 * rowlen)
 
     # Write the number of fields as network-order 16 bits
-    buf = PyByteArray_AS_STRING(out)
-    (<uint16_t*>buf)[0] = htobe16(rowlen)  # this is aligned
-    pos = sizeof(rowlen)
+    memcpy(target, <void *>&berowlen, sizeof(berowlen))
+    pos += sizeof(berowlen)
+
+    cdef Py_ssize_t size
+    cdef uint32_t besize
+    cdef char *buf
+    cdef int i
 
     for i in range(rowlen):
         item = row[i]
@@ -42,22 +49,21 @@ def format_row_binary(row: Sequence[Any], tx: Transformer) -> bytearray:
             dumper = tx.get_dumper(item, FORMAT_BINARY)
             if isinstance(dumper, CDumper):
                 # A cdumper can resize if necessary and copy in place
-                cdumper = dumper
-                length = cdumper.cdump(item, out, pos + sizeof(hlength))
-                # Also add the length of the item, before the item
-                hlength = htobe32(length)
+                size = (<CDumper>dumper).cdump(item, out, pos + sizeof(besize))
+                # Also add the size of the item, before the item
+                besize = htobe32(size)
                 target = PyByteArray_AS_STRING(out)  # might have been moved by cdump
-                memcpy(target + pos, <void *>&hlength, sizeof(hlength))
+                memcpy(target + pos, <void *>&besize, sizeof(besize))
             else:
                 # A Python dumper, gotta call it and extract its juices
                 b = dumper.dump(item)
-                _buffer_as_string_and_size(b, &buf, &length)
-                target = CDumper.ensure_size(out, pos, length + sizeof(hlength))
-                hlength = htobe32(length)
-                memcpy(target, <void *>&hlength, sizeof(hlength))
-                memcpy(target + sizeof(hlength), buf, length)
+                _buffer_as_string_and_size(b, &buf, &size)
+                target = CDumper.ensure_size(out, pos, size + sizeof(besize))
+                besize = htobe32(size)
+                memcpy(target, <void *>&besize, sizeof(besize))
+                memcpy(target + sizeof(besize), buf, size)
 
-            pos += length + sizeof(hlength)
+            pos += size + sizeof(besize)
 
         else:
             target = CDumper.ensure_size(out, pos, sizeof(_binary_null))