+import gc
import pytest
import string
from random import randrange, choice
+from psycopg import sql, errors as e
from psycopg.pq import Format
-from psycopg import errors as e
+from psycopg.adapt import PyFormat
from psycopg.types.numeric import Int4
-from ..utils import eur
+from ..utils import eur, gc_collect
from ..test_copy import sample_text, sample_binary # noqa
from ..test_copy import ensure_table, sample_records
from ..test_copy import sample_tabledef as sample_tabledef_pg
assert data == [(True, True, 1, 256)]
-def copyopt(format):
- return "with binary" if format == Format.BINARY else ""
+@pytest.mark.slow
+@pytest.mark.parametrize(
+ "fmt, set_types",
+ [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
+)
+@pytest.mark.crdb_skip("copy array")
+def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types):
+ faker.format = PyFormat.from_pq(fmt)
+ faker.choose_schema(ncols=20)
+ faker.make_records(20)
+
+ def work():
+ with conn_cls.connect(dsn) as conn:
+ with conn.cursor(binary=fmt) as cur:
+ cur.execute(faker.drop_stmt)
+ cur.execute(faker.create_stmt)
+
+ stmt = sql.SQL("copy {} ({}) from stdin {}").format(
+ faker.table_name,
+ sql.SQL(", ").join(faker.fields_names),
+ sql.SQL("with binary" if fmt else ""),
+ )
+ with cur.copy(stmt) as copy:
+ if set_types:
+ copy.set_types(faker.types_names)
+ for row in faker.records:
+ copy.write_row(row)
+
+ cur.execute(faker.select_stmt)
+ recs = cur.fetchall()
+
+ for got, want in zip(recs, faker.records):
+ faker.assert_record(got, want)
+
+ gc_collect()
+ n = []
+ for i in range(3):
+ work()
+ gc_collect()
+ n.append(len(gc.get_objects()))
+
+ assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
-# TODOCRDB: random tests
+def copyopt(format):
+ return "with binary" if format == Format.BINARY else ""
+import gc
import pytest
import string
from random import randrange, choice
from psycopg.pq import Format
-from psycopg import errors as e
+from psycopg import sql, errors as e
+from psycopg.adapt import PyFormat
from psycopg.types.numeric import Int4
-from ..utils import eur
+from ..utils import eur, gc_collect
from ..test_copy import sample_text, sample_binary # noqa
from ..test_copy import sample_records
from ..test_copy_async import ensure_table
assert data == [(True, True, 1, 256)]
-# TODOCRDB: random tests
+@pytest.mark.slow
+@pytest.mark.parametrize(
+ "fmt, set_types",
+ [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
+)
+@pytest.mark.crdb_skip("copy array")
+async def test_copy_from_leaks(aconn_cls, dsn, faker, fmt, set_types):
+ faker.format = PyFormat.from_pq(fmt)
+ faker.choose_schema(ncols=20)
+ faker.make_records(20)
+
+ async def work():
+ async with await aconn_cls.connect(dsn) as conn:
+ async with conn.cursor(binary=fmt) as cur:
+ await cur.execute(faker.drop_stmt)
+ await cur.execute(faker.create_stmt)
+
+ stmt = sql.SQL("copy {} ({}) from stdin {}").format(
+ faker.table_name,
+ sql.SQL(", ").join(faker.fields_names),
+ sql.SQL("with binary" if fmt else ""),
+ )
+ async with cur.copy(stmt) as copy:
+ if set_types:
+ copy.set_types(faker.types_names)
+ for row in faker.records:
+ await copy.write_row(row)
+
+ await cur.execute(faker.select_stmt)
+ recs = await cur.fetchall()
+
+ for got, want in zip(recs, faker.records):
+ faker.assert_record(got, want)
+
+ gc_collect()
+ n = []
+ for i in range(3):
+ await work()
+ gc_collect()
+ n.append(len(gc.get_objects()))
+
+ assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"