@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-def test_copy_to_leaks(dsn, faker, fmt, method):
+def test_copy_to_leaks(dsn, faker, fmt, method, retries):
faker.format = PgFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
- n = []
- for i in range(3):
+ def work():
with psycopg3.connect(dsn) as conn:
with conn.cursor(binary=fmt) as cur:
cur.execute(faker.drop_stmt)
elif method == "rows":
list(copy.rows())
- tmp = None
+ for retry in retries:
+ with retry:
+ n = []
+ for i in range(3):
+ work()
+ gc.collect()
+ gc.collect()
+ n.append(len(gc.get_objects()))
- del cur, conn
- gc.collect()
- gc.collect()
- n.append(len(gc.get_objects()))
-
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
-def test_copy_from_leaks(dsn, faker, fmt):
+def test_copy_from_leaks(dsn, faker, fmt, retries):
faker.format = PgFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
- n = []
- for i in range(3):
+ def work():
with psycopg3.connect(dsn) as conn:
with conn.cursor(binary=fmt) as cur:
cur.execute(faker.drop_stmt)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- del recs
-
- del cur, conn
- gc.collect()
- gc.collect()
- n.append(len(gc.get_objects()))
+ for retry in retries:
+ with retry:
+ n = []
+ for i in range(3):
+ work()
+ gc.collect()
+ gc.collect()
+ n.append(len(gc.get_objects()))
assert (
n[0] == n[1] == n[2]
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
-async def test_copy_to_leaks(dsn, faker, fmt, method):
+async def test_copy_to_leaks(dsn, faker, fmt, method, retries):
faker.format = PgFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
- n = []
- for i in range(3):
+ async def work():
async with await psycopg3.AsyncConnection.connect(dsn) as conn:
async with conn.cursor(binary=fmt) as cur:
await cur.execute(faker.drop_stmt)
async for x in copy.rows():
pass
- tmp = None
+ async for retry in retries:
+ with retry:
+ n = []
+ for i in range(3):
+ await work()
+ gc.collect()
+ gc.collect()
+ n.append(len(gc.get_objects()))
- del cur, conn
- gc.collect()
- gc.collect()
- n.append(len(gc.get_objects()))
-
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
-async def test_copy_from_leaks(dsn, faker, fmt):
+async def test_copy_from_leaks(dsn, faker, fmt, retries):
faker.format = PgFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
- n = []
- for i in range(3):
+ async def work():
async with await psycopg3.AsyncConnection.connect(dsn) as conn:
async with conn.cursor(binary=fmt) as cur:
await cur.execute(faker.drop_stmt)
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)
- del recs
+ async for retry in retries:
+ with retry:
+ n = []
+ for i in range(3):
+ await work()
+ gc.collect()
+ gc.collect()
+ n.append(len(gc.get_objects()))
- del cur, conn
- gc.collect()
- gc.collect()
- n.append(len(gc.get_objects()))
-
- assert (
- n[0] == n[1] == n[2]
- ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+ assert (
+ n[0] == n[1] == n[2]
+ ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
async def ensure_table(cur, tabledef, name="copy_in"):