def __init__(self, connection):
self.conn = connection
- self.format = Format.AUTO
+ self._format = Format.BINARY
self.records = []
self._schema = None
self._makers = {}
self.table_name = sql.Identifier("fake_table")
+ @property
+ def format(self):
+ return self._format
+
+ @format.setter
+ def format(self, format):
+ if format != Format.BINARY:
+ pytest.xfail("faker to extend to all text dumpers")
+ self._format = format
+
@property
def schema(self):
if not self._schema:
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.AUTO, Format.TEXT, Format.BINARY])
def test_random(conn, faker, fmt):
- if fmt != Format.BINARY:
- pytest.xfail("faker to extend to all text dumpers")
-
faker.format = fmt
faker.choose_schema(ncols=20)
faker.make_records(50)
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
def test_copy_to_leaks(dsn, faker, fmt, method):
- if fmt != Format.BINARY:
- pytest.xfail("faker to extend to all text dumpers")
-
faker.format = PgFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+def test_copy_from_leaks(dsn, faker, fmt):
+ faker.format = PgFormat.from_pq(fmt)
+ faker.choose_schema(ncols=20)
+ faker.make_records(20)
+
+ n = []
+ for i in range(3):
+ with psycopg3.connect(dsn) as conn:
+ with conn.cursor(binary=fmt) as cur:
+ cur.execute(faker.drop_stmt)
+ cur.execute(faker.create_stmt)
+
+ stmt = sql.SQL("copy {} ({}) from stdin (format {})").format(
+ faker.table_name,
+ sql.SQL(", ").join(faker.fields_names),
+ sql.SQL(fmt.name),
+ )
+ with cur.copy(stmt) as copy:
+ for row in faker.records:
+ copy.write_row(row)
+
+ cur.execute(faker.select_stmt)
+ recs = cur.fetchall()
+
+ for got, want in zip(recs, faker.records):
+ faker.assert_record(got, want)
+
+ del recs
+
+ del cur, conn
+ gc.collect()
+ gc.collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+
+
def py_to_raw(item, fmt):
"""Convert from Python type to the expected result from the db"""
if fmt == Format.TEXT:
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
async def test_copy_to_leaks(dsn, faker, fmt, method):
- if fmt != Format.BINARY:
- pytest.xfail("faker to extend to all text dumpers")
-
faker.format = PgFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+async def test_copy_from_leaks(dsn, faker, fmt):
+ faker.format = PgFormat.from_pq(fmt)
+ faker.choose_schema(ncols=20)
+ faker.make_records(20)
+
+ n = []
+ for i in range(3):
+ async with await psycopg3.AsyncConnection.connect(dsn) as conn:
+ async with await conn.cursor(binary=fmt) as cur:
+ await cur.execute(faker.drop_stmt)
+ await cur.execute(faker.create_stmt)
+
+ stmt = sql.SQL("copy {} ({}) from stdin (format {})").format(
+ faker.table_name,
+ sql.SQL(", ").join(faker.fields_names),
+ sql.SQL(fmt.name),
+ )
+ async with cur.copy(stmt) as copy:
+ for row in faker.records:
+ await copy.write_row(row)
+
+ await cur.execute(faker.select_stmt)
+ recs = await cur.fetchall()
+
+ for got, want in zip(recs, faker.records):
+ faker.assert_record(got, want)
+
+ del recs
+
+ del cur, conn
+ gc.collect()
+ gc.collect()
+ n.append(len(gc.get_objects()))
+
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+
+
async def ensure_table(cur, tabledef, name="copy_in"):
await cur.execute(f"drop table if exists {name}")
await cur.execute(f"create table {name} ({tabledef})")
@pytest.mark.parametrize("fmt", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
def test_leak(dsn, faker, fmt, fetch):
- if fmt != Format.BINARY:
- pytest.xfail("faker to extend to all text dumpers")
-
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)
@pytest.mark.parametrize("fmt", [Format.AUTO, Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("fetch", ["one", "many", "all", "iter"])
async def test_leak(dsn, faker, fmt, fetch):
- if fmt != Format.BINARY:
- pytest.xfail("faker to extend to all text dumpers")
-
faker.format = fmt
faker.choose_schema(ncols=5)
faker.make_records(10)