import re
import struct
-from typing import TYPE_CHECKING, AsyncIterator, Callable, Iterator, Generic
-from typing import Any, Dict, List, Match, Optional, Sequence, Type, Union
from types import TracebackType
+from typing import TYPE_CHECKING, AsyncIterator, Iterator, Generic
+from typing import Any, Dict, Match, Optional, Sequence, Type, Union
+from typing_extensions import Protocol
from . import pq
from .pq import Format, ExecStatus
from .cursor import BaseCursor # noqa: F401
from .connection import Connection, AsyncConnection # noqa: F401
-FormatFunc = Callable[[Sequence[Any], Transformer], Union[bytes, bytearray]]
+
+class FormatFunc(Protocol):
+ """The type of a function to format copy data to a bytearray."""
+
+ def __call__(
+ self,
+ row: Sequence[Any],
+ tx: Transformer,
+ out: Optional[bytearray] = None,
+ ) -> bytearray:
+ ...
class BaseCopy(Generic[ConnectionType]):
self._encoding = self.connection.client_encoding
self._signature_sent = False
self._row_mode = False # true if the user is using send_row()
+ self._write_buffer = bytearray()
+ self._write_buffer_size = 32 * 1024
self._finished = False
self._format_row: FormatFunc
# to take care of the end-of-copy marker too
self._row_mode = True
- data = self._format_row(row, self.transformer)
if self.format == Format.BINARY and not self._signature_sent:
- yield from copy_to(self._pgconn, _binary_signature)
+ self._write_buffer += _binary_signature
self._signature_sent = True
- yield from copy_to(self._pgconn, data)
+ self._format_row(row, self.transformer, self._write_buffer)
+ if len(self._write_buffer) > self._write_buffer_size:
+ yield from copy_to(self._pgconn, self._write_buffer)
+ self._write_buffer.clear()
def _finish_gen(self, error: str = "") -> PQGen[None]:
- berr = (
- error.encode(self.connection.client_encoding, "replace")
- if error
- else None
- )
- res = yield from copy_end(self._pgconn, berr)
+ if error:
+ berr = error.encode(self.connection.client_encoding, "replace")
+ res = yield from copy_end(self._pgconn, berr)
+ else:
+ if self._write_buffer:
+ yield from copy_to(self._pgconn, self._write_buffer)
+ self._write_buffer.clear()
+ res = yield from copy_end(self._pgconn, None)
+
nrows = res.command_tuples
self.cursor._rowcount = nrows if nrows is not None else -1
self._finished = True
# If we have sent no data we need to send the signature
# and the trailer
if not self._signature_sent:
- yield from copy_to(self._pgconn, _binary_signature)
- yield from copy_to(self._pgconn, _binary_trailer)
+ self._write_buffer += _binary_signature
+ self._write_buffer += _binary_trailer
elif self._row_mode:
# if we have sent data already, we have sent the signature too
# (either with the first row, or we assume that in block mode
# Write the trailer only if we are sending rows (with the
# assumption that who is copying binary data is sending the
# whole format).
- yield from copy_to(self._pgconn, _binary_trailer)
+ self._write_buffer += _binary_trailer
yield from self._finish_gen()
yield data
-def format_row_text(row: Sequence[Any], tx: "Transformer") -> bytes:
- """Convert a row of objects to the data to send for copy"""
+def format_row_text(
+ row: Sequence[Any], tx: Transformer, out: Optional[bytearray] = None
+) -> bytearray:
+ """Convert a row of objects to the data to send for copy."""
+ if out is None:
+ out = bytearray()
+
if not row:
- return b"\n"
+ out += b"\n"
+ return out
- out: List[bytes] = []
for item in row:
if item is not None:
dumper = tx.get_dumper(item, Format.TEXT)
b = dumper.dump(item)
- out.append(_bsrepl_re.sub(_bsrepl_sub, b))
+ out += _bsrepl_re.sub(_bsrepl_sub, b)
else:
- out.append(br"\N")
+ out += br"\N"
+ out += b"\t"
- out[-1] += b"\n"
- return b"\t".join(out)
+ out[-1:] = b"\n"
+ return out
-def _format_row_binary(row: Sequence[Any], tx: "Transformer") -> bytes:
- """Convert a row of objects to the data to send for binary copy"""
- if not row:
- return b"\x00\x00" # zero columns
+def _format_row_binary(
+ row: Sequence[Any], tx: Transformer, out: Optional[bytearray] = None
+) -> bytearray:
+ """Convert a row of objects to the data to send for binary copy."""
+ if out is None:
+ out = bytearray()
- out = []
- out.append(_pack_int2(len(row)))
+ out += _pack_int2(len(row))
for item in row:
if item is not None:
dumper = tx.get_dumper(item, Format.BINARY)
b = dumper.dump(item)
- out.append(_pack_int4(len(b)))
- out.append(b)
+ out += _pack_int4(len(b))
+ out += b
else:
- out.append(_binary_null)
+ out += _binary_null
- return b"".join(out)
+ return out
_pack_int2 = struct.Struct("!h").pack
from libc.string cimport memcpy
from libc.stdint cimport uint16_t, uint32_t, int32_t
from cpython.bytearray cimport PyByteArray_FromStringAndSize, PyByteArray_Resize
+from cpython.bytearray cimport PyByteArray_AS_STRING, PyByteArray_GET_SIZE
+
from psycopg3_c._psycopg3.endian cimport htobe16, htobe32
cdef int32_t _binary_null = -1
-def format_row_binary(row: Sequence[Any], tx: Transformer) -> bytearray:
+def format_row_binary(
+ row: Sequence[Any], tx: Transformer, out: bytearray = None
+) -> bytearray:
"""Convert a row of adapted data to the data to send for binary copy"""
- cdef bytearray out = PyByteArray_FromStringAndSize("", 0)
- cdef Py_ssize_t pos # position in out where we write
- cdef Py_ssize_t length
- cdef uint16_t rowlen
- cdef uint32_t hlength
- cdef char *buf
- cdef char *target
- cdef int i
- cdef CDumper cdumper
+ cdef Py_ssize_t rowlen = len(row)
+ cdef uint16_t berowlen = htobe16(rowlen)
- rowlen = len(row)
+ cdef Py_ssize_t pos # offset in 'out' where to write
+ if out is None:
+ out = PyByteArray_FromStringAndSize("", 0)
+ pos = 0
+ else:
+ pos = PyByteArray_GET_SIZE(out)
# let's start from a nice chunk
- # (larger than most fixed size, for variable ones, oh well, we will resize)
- PyByteArray_Resize(out, sizeof(rowlen) + 20 * rowlen)
+ # (larger than most fixed size; for variable ones, oh well, we'll resize it)
+ cdef char *target = CDumper.ensure_size(
+ out, pos, sizeof(berowlen) + 20 * rowlen)
# Write the number of fields as network-order 16 bits
- buf = PyByteArray_AS_STRING(out)
- (<uint16_t*>buf)[0] = htobe16(rowlen) # this is aligned
- pos = sizeof(rowlen)
+ memcpy(target, <void *>&berowlen, sizeof(berowlen))
+ pos += sizeof(berowlen)
+
+ cdef Py_ssize_t size
+ cdef uint32_t besize
+ cdef char *buf
+ cdef int i
for i in range(rowlen):
item = row[i]
dumper = tx.get_dumper(item, FORMAT_BINARY)
if isinstance(dumper, CDumper):
# A cdumper can resize if necessary and copy in place
- cdumper = dumper
- length = cdumper.cdump(item, out, pos + sizeof(hlength))
- # Also add the length of the item, before the item
- hlength = htobe32(length)
+ size = (<CDumper>dumper).cdump(item, out, pos + sizeof(besize))
+ # Also add the size of the item, before the item
+ besize = htobe32(size)
target = PyByteArray_AS_STRING(out) # might have been moved by cdump
- memcpy(target + pos, <void *>&hlength, sizeof(hlength))
+ memcpy(target + pos, <void *>&besize, sizeof(besize))
else:
# A Python dumper, gotta call it and extract its juices
b = dumper.dump(item)
- _buffer_as_string_and_size(b, &buf, &length)
- target = CDumper.ensure_size(out, pos, length + sizeof(hlength))
- hlength = htobe32(length)
- memcpy(target, <void *>&hlength, sizeof(hlength))
- memcpy(target + sizeof(hlength), buf, length)
+ _buffer_as_string_and_size(b, &buf, &size)
+ target = CDumper.ensure_size(out, pos, size + sizeof(besize))
+ besize = htobe32(size)
+ memcpy(target, <void *>&besize, sizeof(besize))
+ memcpy(target + sizeof(besize), buf, size)
- pos += length + sizeof(hlength)
+ pos += size + sizeof(besize)
else:
target = CDumper.ensure_size(out, pos, sizeof(_binary_null))