data = yield from self._read_gen()
if not data:
return None
+
if self.format == Format.BINARY:
if not self._signature_sent:
if data[: len(_binary_signature)] != _binary_signature:
)
self._signature_sent = True
data = data[len(_binary_signature) :]
+
elif data == _binary_trailer:
+ yield from self._read_gen()
+ self._finished = True
return None
+
return self._parse_row(data, self.transformer)
def _write_gen(self, buffer: Union[str, bytes]) -> PQGen[None]:
for row in want:
got = copy.read()
assert got == row
+ assert (
+ conn.pgconn.transaction_status == conn.TransactionStatus.ACTIVE
+ )
assert copy.read() == b""
assert copy.read() == b""
assert copy.read() == b""
+ assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
) as copy:
assert list(copy) == want
+ assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
+
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
def test_read_rows(conn, format):
if not row:
break
rows.append(row)
+
assert rows == sample_records
+ assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
[builtins["int4"].oid, builtins["int4"].oid, builtins["text"].oid]
)
rows = list(copy.rows())
+
assert rows == sample_records
+ assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
for row in want:
got = await copy.read()
assert got == row
+ assert (
+ aconn.pgconn.transaction_status
+ == aconn.TransactionStatus.ACTIVE
+ )
assert await copy.read() == b""
assert await copy.read() == b""
assert await copy.read() == b""
+ assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
) as copy:
async for row in copy:
got.append(row)
+
assert got == want
+ assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
if not row:
break
rows.append(row)
+
assert rows == sample_records
+ assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
rows = []
async for row in copy.rows():
rows.append(row)
+
assert rows == sample_records
+ assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])