+import gc
import string
import hashlib
from io import BytesIO, StringIO
import pytest
+import psycopg3
from psycopg3 import pq
from psycopg3 import sql
from psycopg3 import errors as e
from psycopg3.pq import Format
from psycopg3.oids import builtins
+from psycopg3.adapt import Format as PgFormat
from psycopg3.types.numeric import Int4
eur = "\u20ac"
assert data == sample_records
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
+def test_copy_to_leaks(dsn, faker, fmt, method):
+ if fmt != Format.BINARY:
+ pytest.xfail("faker to extend to all text dumpers")
+
+ faker.format = PgFormat.from_pq(fmt)
+ faker.choose_schema(ncols=20)
+ faker.make_records(20)
+
+ n = []
+ for i in range(3):
+ with psycopg3.connect(dsn) as conn:
+ with conn.cursor(binary=fmt) as cur:
+ cur.execute(faker.drop_stmt)
+ cur.execute(faker.create_stmt)
+ cur.executemany(faker.insert_stmt, faker.records)
+
+ stmt = sql.SQL(
+ "copy (select {} from {} order by id) to stdout (format {})"
+ ).format(
+ sql.SQL(", ").join(faker.fields_names),
+ faker.table_name,
+ sql.SQL(fmt.name),
+ )
+
+ with cur.copy(stmt) as copy:
+ types = [
+ t.as_string(conn).replace('"', "")
+ for t in faker.types_names
+ ]
+ copy.set_types(types)
+
+ if method == "read":
+ while 1:
+ tmp = copy.read()
+ if not tmp:
+ break
+ elif method == "iter":
+ list(copy)
+ elif method == "row":
+ while 1:
+ tmp = copy.read_row()
+ if tmp is None:
+ break
+ elif method == "rows":
+ list(copy.rows())
+
+ tmp = None
+
+ del cur, conn
+ gc.collect()
+ gc.collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+
+
def py_to_raw(item, fmt):
"""Convert from Python type to the expected result from the db"""
if fmt == Format.TEXT:
+import gc
import string
import hashlib
from io import BytesIO, StringIO
import pytest
+import psycopg3
from psycopg3 import pq
from psycopg3 import sql
from psycopg3 import errors as e
from psycopg3.pq import Format
from psycopg3.oids import builtins
+from psycopg3.adapt import Format as PgFormat
from .test_copy import sample_text, sample_binary, sample_binary_rows # noqa
from .test_copy import eur, sample_values, sample_records, sample_tabledef
assert data == sample_records
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
+async def test_copy_to_leaks(dsn, faker, fmt, method):
+ if fmt != Format.BINARY:
+ pytest.xfail("faker to extend to all text dumpers")
+
+ faker.format = PgFormat.from_pq(fmt)
+ faker.choose_schema(ncols=20)
+ faker.make_records(20)
+
+ n = []
+ for i in range(3):
+ async with await psycopg3.AsyncConnection.connect(dsn) as conn:
+ async with await conn.cursor(binary=fmt) as cur:
+ await cur.execute(faker.drop_stmt)
+ await cur.execute(faker.create_stmt)
+ await cur.executemany(faker.insert_stmt, faker.records)
+
+ stmt = sql.SQL(
+ "copy (select {} from {} order by id) to stdout (format {})"
+ ).format(
+ sql.SQL(", ").join(faker.fields_names),
+ faker.table_name,
+ sql.SQL(fmt.name),
+ )
+
+ async with cur.copy(stmt) as copy:
+ types = [
+ t.as_string(conn).replace('"', "")
+ for t in faker.types_names
+ ]
+ copy.set_types(types)
+
+ if method == "read":
+ while 1:
+ tmp = await copy.read()
+ if not tmp:
+ break
+ elif method == "iter":
+ async for x in copy:
+ pass
+ elif method == "row":
+ while 1:
+ tmp = await copy.read_row()
+ if tmp is None:
+ break
+ elif method == "rows":
+ async for x in copy.rows():
+ pass
+
+ tmp = None
+
+ del cur, conn
+ gc.collect()
+ gc.collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+
+
async def ensure_table(cur, tabledef, name="copy_in"):
await cur.execute(f"drop table if exists {name}")
await cur.execute(f"create table {name} ({tabledef})")