From: Daniele Varrazzo Date: Sat, 16 Jan 2021 02:34:32 +0000 (+0100) Subject: Added random tests for data copy X-Git-Tag: 3.0.dev0~149 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=394c8f4c25bd7a29c04d92a5d2ce51daaee0e8ff;p=thirdparty%2Fpsycopg.git Added random tests for data copy --- diff --git a/tests/test_copy.py b/tests/test_copy.py index cfed04954..bcf2ec2fc 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -1,3 +1,4 @@ +import gc import string import hashlib from io import BytesIO, StringIO @@ -5,11 +6,13 @@ from itertools import cycle import pytest +import psycopg3 from psycopg3 import pq from psycopg3 import sql from psycopg3 import errors as e from psycopg3.pq import Format from psycopg3.oids import builtins +from psycopg3.adapt import Format as PgFormat from psycopg3.types.numeric import Int4 eur = "\u20ac" @@ -493,6 +496,67 @@ def test_worker_life(conn, format, buffer): assert data == sample_records +@pytest.mark.slow +@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY]) +@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"]) +def test_copy_to_leaks(dsn, faker, fmt, method): + if fmt != Format.BINARY: + pytest.xfail("faker to extend to all text dumpers") + + faker.format = PgFormat.from_pq(fmt) + faker.choose_schema(ncols=20) + faker.make_records(20) + + n = [] + for i in range(3): + with psycopg3.connect(dsn) as conn: + with conn.cursor(binary=fmt) as cur: + cur.execute(faker.drop_stmt) + cur.execute(faker.create_stmt) + cur.executemany(faker.insert_stmt, faker.records) + + stmt = sql.SQL( + "copy (select {} from {} order by id) to stdout (format {})" + ).format( + sql.SQL(", ").join(faker.fields_names), + faker.table_name, + sql.SQL(fmt.name), + ) + + with cur.copy(stmt) as copy: + types = [ + t.as_string(conn).replace('"', "") + for t in faker.types_names + ] + copy.set_types(types) + + if method == "read": + while 1: + tmp = copy.read() + if not tmp: + break + elif method == "iter": + list(copy) + elif method == "row": + while 1: + tmp = copy.read_row() + if tmp is None: + break + elif method == "rows": + list(copy.rows()) + + tmp = None + + del cur, conn + gc.collect() + gc.collect() + n.append(len(gc.get_objects())) + + assert ( + n[0] == n[1] == n[2] + ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" + + def py_to_raw(item, fmt): """Convert from Python type to the expected result from the db""" if fmt == Format.TEXT: diff --git a/tests/test_copy_async.py b/tests/test_copy_async.py index 6dced8dd9..69d61772b 100644 --- a/tests/test_copy_async.py +++ b/tests/test_copy_async.py @@ -1,3 +1,4 @@ +import gc import string import hashlib from io import BytesIO, StringIO @@ -5,11 +6,13 @@ from itertools import cycle import pytest +import psycopg3 from psycopg3 import pq from psycopg3 import sql from psycopg3 import errors as e from psycopg3.pq import Format from psycopg3.oids import builtins +from psycopg3.adapt import Format as PgFormat from .test_copy import sample_text, sample_binary, sample_binary_rows # noqa from .test_copy import eur, sample_values, sample_records, sample_tabledef @@ -472,6 +475,69 @@ async def test_worker_life(aconn, format, buffer): assert data == sample_records +@pytest.mark.slow +@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY]) +@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"]) +async def test_copy_to_leaks(dsn, faker, fmt, method): + if fmt != Format.BINARY: + pytest.xfail("faker to extend to all text dumpers") + + faker.format = PgFormat.from_pq(fmt) + faker.choose_schema(ncols=20) + faker.make_records(20) + + n = [] + for i in range(3): + async with await psycopg3.AsyncConnection.connect(dsn) as conn: + async with await conn.cursor(binary=fmt) as cur: + await cur.execute(faker.drop_stmt) + await cur.execute(faker.create_stmt) + await cur.executemany(faker.insert_stmt, faker.records) + + stmt = sql.SQL( + "copy (select {} from {} order by id) to stdout (format {})" + ).format( + sql.SQL(", ").join(faker.fields_names), + faker.table_name, + sql.SQL(fmt.name), + ) + + async with cur.copy(stmt) as copy: + types = [ + t.as_string(conn).replace('"', "") + for t in faker.types_names + ] + copy.set_types(types) + + if method == "read": + while 1: + tmp = await copy.read() + if not tmp: + break + elif method == "iter": + async for x in copy: + pass + elif method == "row": + while 1: + tmp = await copy.read_row() + if tmp is None: + break + elif method == "rows": + async for x in copy.rows(): + pass + + tmp = None + + del cur, conn + gc.collect() + gc.collect() + n.append(len(gc.get_objects())) + + assert ( + n[0] == n[1] == n[2] + ), f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" + + async def ensure_table(cur, tabledef, name="copy_in"): await cur.execute(f"drop table if exists {name}") await cur.execute(f"create table {name} ({tabledef})")