]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added C helper for binary copy formatting
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 1 Jan 2021 17:51:32 +0000 (18:51 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 8 Jan 2021 01:26:53 +0000 (02:26 +0100)
psycopg3/psycopg3/copy.py
psycopg3_c/psycopg3_c/_psycopg3.pyi
psycopg3_c/psycopg3_c/_psycopg3.pyx
psycopg3_c/psycopg3_c/_psycopg3/copy.pyx [new file with mode: 0644]
psycopg3_c/psycopg3_c/_psycopg3/endian.pxd [moved from psycopg3_c/psycopg3_c/types/endian.pxd with 100% similarity]
psycopg3_c/psycopg3_c/_psycopg3/transform.pyx
psycopg3_c/psycopg3_c/pq/pgconn.pyx
psycopg3_c/psycopg3_c/types/numeric.pyx

index 99959447148c6d588411e6b65ba691305112fa60..0fbc10b358b1594c1dec566208b36d34ddc76e49 100644 (file)
@@ -6,13 +6,13 @@ psycopg3 copy support
 
 import re
 import struct
-from typing import TYPE_CHECKING, AsyncIterator, Iterator, Generic
+from typing import TYPE_CHECKING, AsyncIterator, Callable, Iterator, Generic
 from typing import Any, Dict, List, Match, Optional, Sequence, Type, Union
 from types import TracebackType
 
 from . import pq
 from .pq import Format, ExecStatus
-from .proto import ConnectionType, PQGen
+from .proto import ConnectionType, PQGen, Transformer
 from .generators import copy_from, copy_to, copy_end
 
 if TYPE_CHECKING:
@@ -20,6 +20,8 @@ if TYPE_CHECKING:
     from .cursor import BaseCursor  # noqa: F401
     from .connection import Connection, AsyncConnection  # noqa: F401
 
+FormatFunc = Callable[[Sequence[Any], Transformer], Union[bytes, bytearray]]
+
 
 class BaseCopy(Generic[ConnectionType]):
     def __init__(self, cursor: "BaseCursor[ConnectionType]"):
@@ -39,10 +41,11 @@ class BaseCopy(Generic[ConnectionType]):
         self._row_mode = False  # true if the user is using send_row()
         self._finished = False
 
+        self._format_row: FormatFunc
         if self.format == Format.TEXT:
-            self._format_copy_row = format_row_text
+            self._format_row = format_row_text
         else:
-            self._format_copy_row = format_row_binary
+            self._format_row = format_row_binary
 
     def __repr__(self) -> str:
         cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
@@ -76,7 +79,7 @@ class BaseCopy(Generic[ConnectionType]):
         # to take care of the end-of-copy marker too
         self._row_mode = True
 
-        data = self._format_row(row)
+        data = self._format_row(row, self.transformer)
         if self.format == Format.BINARY and not self._signature_sent:
             yield from copy_to(self._pgconn, _binary_signature)
             self._signature_sent = True
@@ -129,18 +132,6 @@ class BaseCopy(Generic[ConnectionType]):
 
     # Support methods
 
-    def _format_row(self, row: Sequence[Any]) -> bytes:
-        """Convert a Python sequence to the data to send for copy"""
-        out: List[Optional[bytes]] = []
-        for item in row:
-            if item is not None:
-                dumper = self.transformer.get_dumper(item, self.format)
-                out.append(dumper.dump(item))
-            else:
-                out.append(None)
-
-        return self._format_copy_row(out)
-
     def _ensure_bytes(self, data: Union[bytes, str]) -> bytes:
         if isinstance(data, bytes):
             return data
@@ -245,35 +236,46 @@ class AsyncCopy(BaseCopy["AsyncConnection"]):
             yield data
 
 
-def format_row_text(row: Sequence[Optional[bytes]]) -> bytes:
-    """Convert a row of adapted data to the data to send for text copy"""
-    return (
-        b"\t".join(
-            _bsrepl_re.sub(_bsrepl_sub, item) if item is not None else br"\N"
-            for item in row
-        )
-        + b"\n"
-    )
+def format_row_text(row: Sequence[Any], tx: "Transformer") -> bytes:
+    """Convert a row of objects to the data to send for copy"""
+    if not row:
+        return b"\n"
 
+    out: List[bytes] = []
+    for item in row:
+        if item is not None:
+            dumper = tx.get_dumper(item, Format.TEXT)
+            b = dumper.dump(item)
+            out.append(_bsrepl_re.sub(_bsrepl_sub, b))
+        else:
+            out.append(br"\N")
+
+    out[-1] += b"\n"
+    return b"\t".join(out)
+
+
+def _format_row_binary(row: Sequence[Any], tx: "Transformer") -> bytes:
+    """Convert a row of objects to the data to send for binary copy"""
+    if not row:
+        return b"\x00\x00"  # zero columns
 
-def format_row_binary(
-    row: Sequence[Optional[bytes]],
-    __int2_struct: struct.Struct = struct.Struct("!h"),
-    __int4_struct: struct.Struct = struct.Struct("!i"),
-) -> bytes:
-    """Convert a row of adapted data to the data to send for binary copy"""
     out = []
-    out.append(__int2_struct.pack(len(row)))
+    out.append(_pack_int2(len(row)))
     for item in row:
         if item is not None:
-            out.append(__int4_struct.pack(len(item)))
-            out.append(item)
+            dumper = tx.get_dumper(item, Format.BINARY)
+            b = dumper.dump(item)
+            out.append(_pack_int4(len(b)))
+            out.append(b)
         else:
             out.append(_binary_null)
 
     return b"".join(out)
 
 
+_pack_int2 = struct.Struct("!h").pack
+_pack_int4 = struct.Struct("!i").pack
+
 _binary_signature = (
     # Signature, flags, extra length
     b"PGCOPY\n\xff\r\n\0"
@@ -300,3 +302,16 @@ def _bsrepl_sub(
 
 
 _bsrepl_re = re.compile(b"[\b\t\n\v\f\r\\\\]")
+
+
+# Override it with fast object if available
+
+format_row_binary: FormatFunc
+
+if pq.__impl__ == "c":
+    from psycopg3_c import _psycopg3
+
+    format_row_binary = _psycopg3.format_row_binary
+
+else:
+    format_row_binary = _format_row_binary
index d7f0840d78ee82894175e38ad8d5ada2567a8734..0a229001c84a8407c0d516d2a6693801aad049d1 100644 (file)
@@ -9,14 +9,14 @@ information. Will submit a bug.
 
 from typing import Any, Iterable, List, Optional, Sequence, Tuple
 
+from psycopg3 import proto
 from psycopg3.adapt import Dumper, Loader, AdaptersMap
-from psycopg3.proto import AdaptContext, PQGen, PQGenConn
 from psycopg3.connection import BaseConnection
 from psycopg3.pq import Format
 from psycopg3.pq.proto import PGconn, PGresult
 
-class Transformer(AdaptContext):
-    def __init__(self, context: Optional[AdaptContext] = None): ...
+class Transformer(proto.AdaptContext):
+    def __init__(self, context: Optional[proto.AdaptContext] = None): ...
     @property
     def connection(self) -> Optional[BaseConnection]: ...
     @property
@@ -37,7 +37,8 @@ class Transformer(AdaptContext):
     def get_loader(self, oid: int, format: Format) -> Loader: ...
 
 def register_builtin_c_adapters() -> None: ...
-def connect(conninfo: str) -> PQGenConn[PGconn]: ...
-def execute(pgconn: PGconn) -> PQGen[List[PGresult]]: ...
+def connect(conninfo: str) -> proto.PQGenConn[PGconn]: ...
+def execute(pgconn: PGconn) -> proto.PQGen[List[PGresult]]: ...
+def format_row_binary(row: Sequence[Any], tx: proto.Transformer) -> bytes: ...
 
 # vim: set syntax=python:
index 4e32e9db513a80cbcadbae43805e2858b3188772..77e2207515db9f67667073bfef409673c10299ec 100644 (file)
@@ -11,7 +11,14 @@ from psycopg3_c cimport pq
 from psycopg3_c.pq cimport libpq
 from psycopg3_c._psycopg3 cimport oids
 
+from psycopg3.pq import Format
+
+FORMAT_TEXT = Format.TEXT
+FORMAT_BINARY = Format.BINARY
+
+
 include "_psycopg3/adapt.pyx"
+include "_psycopg3/copy.pyx"
 include "_psycopg3/generators.pyx"
 include "_psycopg3/transform.pyx"
 
diff --git a/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx b/psycopg3_c/psycopg3_c/_psycopg3/copy.pyx
new file mode 100644 (file)
index 0000000..ce97c7d
--- /dev/null
@@ -0,0 +1,69 @@
+"""
+C optimised functions for the copy system.
+
+"""
+
+# Copyright (C) 2020 The Psycopg Team
+
+from libc.string cimport memcpy
+from libc.stdint cimport uint16_t, uint32_t, int32_t
+from cpython.bytearray cimport PyByteArray_FromStringAndSize, PyByteArray_Resize
+from psycopg3_c._psycopg3.endian cimport htobe16, htobe32
+
+cdef int32_t _binary_null = -1
+
+
+def format_row_binary(row: Sequence[Any], tx: Transformer) -> bytearray:
+    """Convert a row of adapted data to the data to send for binary copy"""
+    cdef bytearray out = PyByteArray_FromStringAndSize("", 0)
+    cdef Py_ssize_t pos  # position in out where we write
+    cdef Py_ssize_t length
+    cdef uint16_t rowlen
+    cdef uint32_t hlength
+    cdef char *buf
+    cdef char *target
+    cdef int i
+    cdef CDumper cdumper
+
+    rowlen = len(row)
+
+    # let's start from a nice chunk
+    # (larger than most fixed size, for variable ones, oh well, we will resize)
+    PyByteArray_Resize(out, sizeof(rowlen) + 20 * rowlen)
+
+    # Write the number of fields as network-order 16 bits
+    buf = PyByteArray_AS_STRING(out)
+    (<uint16_t*>buf)[0] = htobe16(rowlen)  # this is aligned
+    pos = sizeof(rowlen)
+
+    for i in range(rowlen):
+        item = row[i]
+        if item is not None:
+            dumper = tx.get_dumper(item, FORMAT_BINARY)
+            if isinstance(dumper, CDumper):
+                # A cdumper can resize if necessary and copy in place
+                cdumper = dumper
+                length = cdumper.cdump(item, out, pos + sizeof(hlength))
+                # Also add the length of the item, before the item
+                hlength = htobe32(length)
+                target = PyByteArray_AS_STRING(out)  # might have been moved by cdump
+                memcpy(target + pos, <void *>&hlength, sizeof(hlength))
+            else:
+                # A Python dumper, gotta call it and extract its juices
+                b = dumper.dump(item)
+                _buffer_as_string_and_size(b, &buf, &length)
+                target = CDumper.ensure_size(out, pos, length + sizeof(hlength))
+                hlength = htobe32(length)
+                memcpy(target, <void *>&hlength, sizeof(hlength))
+                memcpy(target + sizeof(hlength), buf, length)
+
+            pos += length + sizeof(hlength)
+
+        else:
+            target = CDumper.ensure_size(out, pos, sizeof(_binary_null))
+            memcpy(target, <void *>&_binary_null, sizeof(_binary_null))
+            pos += sizeof(_binary_null)
+
+    # Resize to the final size
+    PyByteArray_Resize(out, pos)
+    return out
index 8ac9fa4b79adb9efae0ed0505bdcaab257b20f94..bad7c0824e853edbba2199be3c446ce9531f9f7c 100644 (file)
@@ -17,7 +17,6 @@ from cpython.object cimport PyObject, PyObject_CallFunctionObjArgs
 from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
 
 from psycopg3 import errors as e
-from psycopg3.pq import Format
 
 
 # internal structure: you are not supposed to know this. But it's worth some
index 0e250d48783617484a8add0ee4bff410163a7b79..d2560259cb30ec6aa5573416b0a857bfec4797ae 100644 (file)
@@ -424,12 +424,12 @@ cdef class PGconn:
         else:
             return None
 
-    def put_copy_data(self, buffer: bytes) -> int:
+    def put_copy_data(self, buffer) -> int:
         cdef int rv
         cdef char *cbuffer
         cdef Py_ssize_t length
 
-        PyBytes_AsStringAndSize(buffer, &cbuffer, &length)
+        _buffer_as_string_and_size(buffer, &cbuffer, &length)
         rv = libpq.PQputCopyData(self.pgconn_ptr, cbuffer, length)
         if rv < 0:
             raise PQerror(f"sending copy data failed: {error_message(self)}")
index 0bcd3c4fcb12a65380923ac90be0c22b77b6125f..928920bf8ef037fd1cde1507724d019a31d8e8e5 100644 (file)
@@ -10,7 +10,7 @@ from cpython.long cimport PyLong_FromString, PyLong_FromLong, PyLong_AsLongLong
 from cpython.long cimport PyLong_FromLongLong, PyLong_FromUnsignedLong
 from cpython.float cimport PyFloat_FromDouble
 
-from endian cimport be16toh, be32toh, be64toh, htobe64
+from psycopg3_c._psycopg3.endian cimport be16toh, be32toh, be64toh, htobe64
 
 cdef extern from "Python.h":
     # work around https://github.com/cython/cython/issues/3909