]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Added random tests for data copy
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 16 Jan 2021 02:34:32 +0000 (03:34 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 16 Jan 2021 10:26:19 +0000 (11:26 +0100)
tests/test_copy.py
tests/test_copy_async.py

index cfed04954a8ca42c1376a997a5c822f7b627f42c..bcf2ec2fcf6458eaff5f6cb22d1af21a834c2161 100644 (file)
@@ -1,3 +1,4 @@
+import gc
 import string
 import hashlib
 from io import BytesIO, StringIO
@@ -5,11 +6,13 @@ from itertools import cycle
 
 import pytest
 
+import psycopg3
 from psycopg3 import pq
 from psycopg3 import sql
 from psycopg3 import errors as e
 from psycopg3.pq import Format
 from psycopg3.oids import builtins
+from psycopg3.adapt import Format as PgFormat
 from psycopg3.types.numeric import Int4
 
 eur = "\u20ac"
@@ -493,6 +496,67 @@ def test_worker_life(conn, format, buffer):
     assert data == sample_records
 
 
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
+def test_copy_to_leaks(dsn, faker, fmt, method):
+    if fmt != Format.BINARY:
+        pytest.xfail("faker to extend to all text dumpers")
+
+    faker.format = PgFormat.from_pq(fmt)
+    faker.choose_schema(ncols=20)
+    faker.make_records(20)
+
+    n = []
+    for i in range(3):
+        with psycopg3.connect(dsn) as conn:
+            with conn.cursor(binary=fmt) as cur:
+                cur.execute(faker.drop_stmt)
+                cur.execute(faker.create_stmt)
+                cur.executemany(faker.insert_stmt, faker.records)
+
+                stmt = sql.SQL(
+                    "copy (select {} from {} order by id) to stdout (format {})"
+                ).format(
+                    sql.SQL(", ").join(faker.fields_names),
+                    faker.table_name,
+                    sql.SQL(fmt.name),
+                )
+
+                with cur.copy(stmt) as copy:
+                    types = [
+                        t.as_string(conn).replace('"', "")
+                        for t in faker.types_names
+                    ]
+                    copy.set_types(types)
+
+                    if method == "read":
+                        while 1:
+                            tmp = copy.read()
+                            if not tmp:
+                                break
+                    elif method == "iter":
+                        list(copy)
+                    elif method == "row":
+                        while 1:
+                            tmp = copy.read_row()
+                            if tmp is None:
+                                break
+                    elif method == "rows":
+                        list(copy.rows())
+
+                    tmp = None
+
+        del cur, conn
+        gc.collect()
+        gc.collect()
+        n.append(len(gc.get_objects()))
+
+    assert (
+        n[0] == n[1] == n[2]
+    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+
+
 def py_to_raw(item, fmt):
     """Convert from Python type to the expected result from the db"""
     if fmt == Format.TEXT:
index 6dced8dd9680f8d054ac4960caf5ab75f2b458d3..69d61772be2be6ad3b7cb814c6ce0f99f57e81c4 100644 (file)
@@ -1,3 +1,4 @@
+import gc
 import string
 import hashlib
 from io import BytesIO, StringIO
@@ -5,11 +6,13 @@ from itertools import cycle
 
 import pytest
 
+import psycopg3
 from psycopg3 import pq
 from psycopg3 import sql
 from psycopg3 import errors as e
 from psycopg3.pq import Format
 from psycopg3.oids import builtins
+from psycopg3.adapt import Format as PgFormat
 
 from .test_copy import sample_text, sample_binary, sample_binary_rows  # noqa
 from .test_copy import eur, sample_values, sample_records, sample_tabledef
@@ -472,6 +475,69 @@ async def test_worker_life(aconn, format, buffer):
     assert data == sample_records
 
 
+@pytest.mark.slow
+@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
+async def test_copy_to_leaks(dsn, faker, fmt, method):
+    if fmt != Format.BINARY:
+        pytest.xfail("faker to extend to all text dumpers")
+
+    faker.format = PgFormat.from_pq(fmt)
+    faker.choose_schema(ncols=20)
+    faker.make_records(20)
+
+    n = []
+    for i in range(3):
+        async with await psycopg3.AsyncConnection.connect(dsn) as conn:
+            async with await conn.cursor(binary=fmt) as cur:
+                await cur.execute(faker.drop_stmt)
+                await cur.execute(faker.create_stmt)
+                await cur.executemany(faker.insert_stmt, faker.records)
+
+                stmt = sql.SQL(
+                    "copy (select {} from {} order by id) to stdout (format {})"
+                ).format(
+                    sql.SQL(", ").join(faker.fields_names),
+                    faker.table_name,
+                    sql.SQL(fmt.name),
+                )
+
+                async with cur.copy(stmt) as copy:
+                    types = [
+                        t.as_string(conn).replace('"', "")
+                        for t in faker.types_names
+                    ]
+                    copy.set_types(types)
+
+                    if method == "read":
+                        while 1:
+                            tmp = await copy.read()
+                            if not tmp:
+                                break
+                    elif method == "iter":
+                        async for x in copy:
+                            pass
+                    elif method == "row":
+                        while 1:
+                            tmp = await copy.read_row()
+                            if tmp is None:
+                                break
+                    elif method == "rows":
+                        async for x in copy.rows():
+                            pass
+
+                    tmp = None
+
+        del cur, conn
+        gc.collect()
+        gc.collect()
+        n.append(len(gc.get_objects()))
+
+    assert (
+        n[0] == n[1] == n[2]
+    ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+
+
 async def ensure_table(cur, tabledef, name="copy_in"):
     await cur.execute(f"drop table if exists {name}")
     await cur.execute(f"create table {name} ({tabledef})")