import re
import struct
-from typing import TYPE_CHECKING, AsyncIterator, Iterator, Generic
+from typing import TYPE_CHECKING, AsyncIterator, Callable, Iterator, Generic
from typing import Any, Dict, List, Match, Optional, Sequence, Type, Union
from types import TracebackType
from . import pq
from .pq import Format, ExecStatus
-from .proto import ConnectionType, PQGen
+from .proto import ConnectionType, PQGen, Transformer
from .generators import copy_from, copy_to, copy_end
if TYPE_CHECKING:
from .cursor import BaseCursor # noqa: F401
from .connection import Connection, AsyncConnection # noqa: F401
+FormatFunc = Callable[[Sequence[Any], Transformer], Union[bytes, bytearray]]
+
class BaseCopy(Generic[ConnectionType]):
def __init__(self, cursor: "BaseCursor[ConnectionType]"):
self._row_mode = False # true if the user is using send_row()
self._finished = False
+ self._format_row: FormatFunc
if self.format == Format.TEXT:
- self._format_copy_row = format_row_text
+ self._format_row = format_row_text
else:
- self._format_copy_row = format_row_binary
+ self._format_row = format_row_binary
def __repr__(self) -> str:
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
# to take care of the end-of-copy marker too
self._row_mode = True
- data = self._format_row(row)
+ data = self._format_row(row, self.transformer)
if self.format == Format.BINARY and not self._signature_sent:
yield from copy_to(self._pgconn, _binary_signature)
self._signature_sent = True
# Support methods
- def _format_row(self, row: Sequence[Any]) -> bytes:
- """Convert a Python sequence to the data to send for copy"""
- out: List[Optional[bytes]] = []
- for item in row:
- if item is not None:
- dumper = self.transformer.get_dumper(item, self.format)
- out.append(dumper.dump(item))
- else:
- out.append(None)
-
- return self._format_copy_row(out)
-
def _ensure_bytes(self, data: Union[bytes, str]) -> bytes:
if isinstance(data, bytes):
return data
yield data
-def format_row_text(row: Sequence[Optional[bytes]]) -> bytes:
- """Convert a row of adapted data to the data to send for text copy"""
- return (
- b"\t".join(
- _bsrepl_re.sub(_bsrepl_sub, item) if item is not None else br"\N"
- for item in row
- )
- + b"\n"
- )
+def format_row_text(row: Sequence[Any], tx: "Transformer") -> bytes:
+ """Convert a row of objects to the data to send for copy"""
+ if not row:
+ return b"\n"
+ out: List[bytes] = []
+ for item in row:
+ if item is not None:
+ dumper = tx.get_dumper(item, Format.TEXT)
+ b = dumper.dump(item)
+ out.append(_bsrepl_re.sub(_bsrepl_sub, b))
+ else:
+ out.append(br"\N")
+
+ out[-1] += b"\n"
+ return b"\t".join(out)
+
+
+def _format_row_binary(row: Sequence[Any], tx: "Transformer") -> bytes:
+ """Convert a row of objects to the data to send for binary copy"""
+ if not row:
+ return b"\x00\x00" # zero columns
-def format_row_binary(
- row: Sequence[Optional[bytes]],
- __int2_struct: struct.Struct = struct.Struct("!h"),
- __int4_struct: struct.Struct = struct.Struct("!i"),
-) -> bytes:
- """Convert a row of adapted data to the data to send for binary copy"""
out = []
- out.append(__int2_struct.pack(len(row)))
+ out.append(_pack_int2(len(row)))
for item in row:
if item is not None:
- out.append(__int4_struct.pack(len(item)))
- out.append(item)
+ dumper = tx.get_dumper(item, Format.BINARY)
+ b = dumper.dump(item)
+ out.append(_pack_int4(len(b)))
+ out.append(b)
else:
out.append(_binary_null)
return b"".join(out)
+_pack_int2 = struct.Struct("!h").pack
+_pack_int4 = struct.Struct("!i").pack
+
_binary_signature = (
# Signature, flags, extra length
b"PGCOPY\n\xff\r\n\0"
_bsrepl_re = re.compile(b"[\b\t\n\v\f\r\\\\]")
+
+
+# Override it with fast object if available
+
+format_row_binary: FormatFunc
+
+if pq.__impl__ == "c":
+ from psycopg3_c import _psycopg3
+
+ format_row_binary = _psycopg3.format_row_binary
+
+else:
+ format_row_binary = _format_row_binary
from typing import Any, Iterable, List, Optional, Sequence, Tuple
+from psycopg3 import proto
from psycopg3.adapt import Dumper, Loader, AdaptersMap
-from psycopg3.proto import AdaptContext, PQGen, PQGenConn
from psycopg3.connection import BaseConnection
from psycopg3.pq import Format
from psycopg3.pq.proto import PGconn, PGresult
-class Transformer(AdaptContext):
- def __init__(self, context: Optional[AdaptContext] = None): ...
+class Transformer(proto.AdaptContext):
+ def __init__(self, context: Optional[proto.AdaptContext] = None): ...
@property
def connection(self) -> Optional[BaseConnection]: ...
@property
def get_loader(self, oid: int, format: Format) -> Loader: ...
def register_builtin_c_adapters() -> None: ...
-def connect(conninfo: str) -> PQGenConn[PGconn]: ...
-def execute(pgconn: PGconn) -> PQGen[List[PGresult]]: ...
+def connect(conninfo: str) -> proto.PQGenConn[PGconn]: ...
+def execute(pgconn: PGconn) -> proto.PQGen[List[PGresult]]: ...
+def format_row_binary(row: Sequence[Any], tx: proto.Transformer) -> bytes: ...
# vim: set syntax=python:
from psycopg3_c.pq cimport libpq
from psycopg3_c._psycopg3 cimport oids
+from psycopg3.pq import Format
+
+FORMAT_TEXT = Format.TEXT
+FORMAT_BINARY = Format.BINARY
+
+
include "_psycopg3/adapt.pyx"
+include "_psycopg3/copy.pyx"
include "_psycopg3/generators.pyx"
include "_psycopg3/transform.pyx"
--- /dev/null
+"""
+C optimised functions for the copy system.
+
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from libc.string cimport memcpy
+from libc.stdint cimport uint16_t, uint32_t, int32_t
+from cpython.bytearray cimport PyByteArray_FromStringAndSize, PyByteArray_Resize
+from psycopg3_c._psycopg3.endian cimport htobe16, htobe32
+
+cdef int32_t _binary_null = -1
+
+
+def format_row_binary(row: Sequence[Any], tx: Transformer) -> bytearray:
+ """Convert a row of adapted data to the data to send for binary copy"""
+ cdef bytearray out = PyByteArray_FromStringAndSize("", 0)
+ cdef Py_ssize_t pos # position in out where we write
+ cdef Py_ssize_t length
+ cdef uint16_t rowlen
+ cdef uint32_t hlength
+ cdef char *buf
+ cdef char *target
+ cdef int i
+ cdef CDumper cdumper
+
+ rowlen = len(row)
+
+ # let's start from a nice chunk
+ # (larger than most fixed size, for variable ones, oh well, we will resize)
+ PyByteArray_Resize(out, sizeof(rowlen) + 20 * rowlen)
+
+ # Write the number of fields as network-order 16 bits
+ buf = PyByteArray_AS_STRING(out)
+ (<uint16_t*>buf)[0] = htobe16(rowlen) # this is aligned
+ pos = sizeof(rowlen)
+
+ for i in range(rowlen):
+ item = row[i]
+ if item is not None:
+ dumper = tx.get_dumper(item, FORMAT_BINARY)
+ if isinstance(dumper, CDumper):
+ # A cdumper can resize if necessary and copy in place
+ cdumper = dumper
+ length = cdumper.cdump(item, out, pos + sizeof(hlength))
+ # Also add the length of the item, before the item
+ hlength = htobe32(length)
+ target = PyByteArray_AS_STRING(out) # might have been moved by cdump
+ memcpy(target + pos, <void *>&hlength, sizeof(hlength))
+ else:
+ # A Python dumper, gotta call it and extract its juices
+ b = dumper.dump(item)
+ _buffer_as_string_and_size(b, &buf, &length)
+ target = CDumper.ensure_size(out, pos, length + sizeof(hlength))
+ hlength = htobe32(length)
+ memcpy(target, <void *>&hlength, sizeof(hlength))
+ memcpy(target + sizeof(hlength), buf, length)
+
+ pos += length + sizeof(hlength)
+
+ else:
+ target = CDumper.ensure_size(out, pos, sizeof(_binary_null))
+ memcpy(target, <void *>&_binary_null, sizeof(_binary_null))
+ pos += sizeof(_binary_null)
+
+ # Resize to the final size
+ PyByteArray_Resize(out, pos)
+ return out
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from psycopg3 import errors as e
-from psycopg3.pq import Format
# internal structure: you are not supposed to know this. But it's worth some
else:
return None
- def put_copy_data(self, buffer: bytes) -> int:
+ def put_copy_data(self, buffer) -> int:
cdef int rv
cdef char *cbuffer
cdef Py_ssize_t length
- PyBytes_AsStringAndSize(buffer, &cbuffer, &length)
+ _buffer_as_string_and_size(buffer, &cbuffer, &length)
rv = libpq.PQputCopyData(self.pgconn_ptr, cbuffer, length)
if rv < 0:
raise PQerror(f"sending copy data failed: {error_message(self)}")
from cpython.long cimport PyLong_FromLongLong, PyLong_FromUnsignedLong
from cpython.float cimport PyFloat_FromDouble
-from endian cimport be16toh, be32toh, be64toh, htobe64
+from psycopg3_c._psycopg3.endian cimport be16toh, be32toh, be64toh, htobe64
cdef extern from "Python.h":
# work around https://github.com/cython/cython/issues/3909