From: Daniele Varrazzo Date: Wed, 24 Jun 2020 08:44:32 +0000 (+1200) Subject: Dropped attempt of read support X-Git-Tag: 3.0.dev0~476 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=627fb8793ce470c7ac5aaf088030ea442afa2dc4;p=thirdparty%2Fpsycopg.git Dropped attempt of read support It doesn't add anything to selecting, except that it cannot work as type info is not available on copy. Furthermore it was based on the broken assumption that copy sends generic buffers, whereas it sends entire rows, so it was uselessly complicated. --- diff --git a/psycopg3/copy.py b/psycopg3/copy.py index 96a2604e1..38c9e88f2 100644 --- a/psycopg3/copy.py +++ b/psycopg3/copy.py @@ -7,12 +7,10 @@ psycopg3 copy support import re import codecs from typing import TYPE_CHECKING, AsyncGenerator, Generator -from typing import Any, Deque, Dict, List, Match, Optional, Tuple, Type, Union +from typing import Dict, Match, Optional, Type, Union from types import TracebackType -from collections import deque from . import pq -from . import errors as e from .proto import AdaptContext from .generators import copy_from, copy_to, copy_end @@ -21,8 +19,6 @@ if TYPE_CHECKING: class BaseCopy: - _connection: Optional["BaseConnection"] - def __init__( self, context: AdaptContext, @@ -31,14 +27,11 @@ class BaseCopy: ): from .adapt import Transformer - self._connection = None + self._connection: Optional["BaseConnection"] = None self._transformer = Transformer(context) self.format = format self.pgresult = result self._finished = False - - self._partial: Deque[bytes] = deque() - self._header_seen = False self._codec: Optional[codecs.CodecInfo] = None @property @@ -65,54 +58,6 @@ class BaseCopy: self._pgresult = result self._transformer.pgresult = result - def load(self, buffer: bytes) -> List[Tuple[Any, ...]]: - if self._finished: - raise e.ProgrammingError("copy already finished") - - if self.format == pq.Format.TEXT: - return self._load_text(buffer) - else: - return self._load_binary(buffer) - - def _load_text(self, buffer: bytes) -> List[Tuple[Any, ...]]: - rows = buffer.split(b"\n") - last_row = rows.pop(-1) - - if self._partial and rows: - self._partial.append(rows[0]) - rows[0] = b"".join(self._partial) - self._partial.clear() - - if last_row: - self._partial.append(last_row) - - # If there is no result then the transformer has no info about types - load_sequence = ( - self._transformer.load_sequence - if self.pgresult is not None - else None - ) - - rv = [] - for row in rows: - if row == b"\\.": - self._finished = True - break - - values = row.split(b"\t") - prow = tuple( - _bsrepl_re.sub(_bsrepl_sub, v) if v != b"\\N" else None - for v in values - ) - rv.append( - load_sequence(prow) if load_sequence is not None else prow - ) - - return rv - - def _load_binary(self, buffer: bytes) -> List[Tuple[Any, ...]]: - raise NotImplementedError - def _ensure_bytes(self, data: Union[bytes, str]) -> bytes: if isinstance(data, bytes): return data diff --git a/tests/test_copy.py b/tests/test_copy.py index 3c690a8b8..3e256df6a 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -3,7 +3,6 @@ import pytest from psycopg3 import pq from psycopg3 import errors as e from psycopg3.adapt import Format -from psycopg3.types import builtins sample_records = [(10, 20, "hello"), (40, None, "world")] @@ -34,93 +33,6 @@ sample_binary_rows = [ sample_binary = b"".join(sample_binary_rows) -def set_sample_attributes(res, format): - attrs = [ - pq.PGresAttDesc(b"col1", 0, 0, format, builtins["int4"].oid, 0, 0), - pq.PGresAttDesc(b"col2", 0, 0, format, builtins["int4"].oid, 0, 0), - pq.PGresAttDesc(b"data", 0, 0, format, builtins["text"].oid, 0, 0), - ] - res.set_attributes(attrs) - - -@pytest.mark.xfail -@pytest.mark.parametrize( - "format, buffer", - [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")], -) -def test_load_noinfo(conn, format, buffer): - from psycopg3.copy import Copy - - copy = Copy(context=None, result=None, format=format) - records = copy.load(globals()[buffer]) - assert records == as_bytes(sample_records) - - -@pytest.mark.xfail -@pytest.mark.parametrize( - "format, buffer", - [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")], -) -def test_load(conn, format, buffer): - from psycopg3.copy import Copy - - res = conn.pgconn.make_empty_result(pq.ExecStatus.COPY_OUT) - set_sample_attributes(res, format) - - copy = Copy(context=None, result=res, format=format) - records = copy.load(globals()[buffer]) - assert records == sample_records - - -@pytest.mark.xfail -@pytest.mark.parametrize( - "format, buffer", - [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")], -) -def test_dump(conn, format, buffer): - from psycopg3.copy import Copy - - res = conn.pgconn.make_empty_result(pq.ExecStatus.COPY_OUT) - set_sample_attributes(res, format) - - copy = Copy(context=None, result=res, format=format) - assert copy.get_buffer() is None - for row in sample_records: - copy.dump(row) - assert copy.get_buffer() == globals()[buffer] - assert copy.get_buffer() is None - - -@pytest.mark.xfail -@pytest.mark.parametrize( - "format, buffer", - [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")], -) -def test_buffers(format, buffer): - from psycopg3.copy import Copy - - copy = Copy(format=format) - assert list(copy.buffers(sample_records)) == [globals()[buffer]] - - -@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY]) -def test_copy_out_read(conn, format): - cur = conn.cursor() - copy = cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") - - if format == pq.Format.TEXT: - want = [row + b"\n" for row in sample_text.splitlines()] - else: - want = sample_binary_rows - - for row in want: - got = copy.read() - assert got == row - - assert copy.read() is None - assert copy.read() is None - - @pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY]) def test_copy_out_iter(conn, format): cur = conn.cursor()