From: Daniele Varrazzo Date: Fri, 29 Apr 2022 21:39:36 +0000 (+0200) Subject: test: add tests to verify copy across tables works X-Git-Tag: 3.1~134 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7c8ea085426ce409c16c7eb29523899deee98cda;p=thirdparty%2Fpsycopg.git test: add tests to verify copy across tables works --- diff --git a/tests/test_copy.py b/tests/test_copy.py index 52e3e968d..2d2826d79 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -703,6 +703,37 @@ def test_copy_from_leaks(dsn, faker, fmt, set_types): assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" +@pytest.mark.slow +@pytest.mark.parametrize("mode", ["row", "block", "binary"]) +def test_copy_table_across(dsn, faker, mode): + faker.choose_schema(ncols=20) + faker.make_records(20) + + with psycopg.connect(dsn) as conn1, psycopg.connect(dsn) as conn2: + faker.table_name = sql.Identifier("copy_src") + conn1.execute(faker.drop_stmt) + conn1.execute(faker.create_stmt) + conn1.cursor().executemany(faker.insert_stmt, faker.records) + + faker.table_name = sql.Identifier("copy_tgt") + conn2.execute(faker.drop_stmt) + conn2.execute(faker.create_stmt) + + fmt = "(format binary)" if mode == "binary" else "" + with conn1.cursor().copy(f"copy copy_src to stdout {fmt}") as copy1: + with conn2.cursor().copy(f"copy copy_tgt from stdin {fmt}") as copy2: + if mode == "row": + for row in copy1.rows(): + copy2.write_row(row) + else: + for data in copy1: + copy2.write(data) + + recs = conn2.execute(faker.select_stmt).fetchall() + for got, want in zip(recs, faker.records): + faker.assert_record(got, want) + + def py_to_raw(item, fmt): """Convert from Python type to the expected result from the db""" if fmt == Format.TEXT: diff --git a/tests/test_copy_async.py b/tests/test_copy_async.py index 6fc33b1c2..0c0683da8 100644 --- a/tests/test_copy_async.py +++ b/tests/test_copy_async.py @@ -703,6 +703,39 @@ async def test_copy_from_leaks(dsn, faker, fmt, set_types): assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}" +@pytest.mark.slow +@pytest.mark.parametrize("mode", ["row", "block", "binary"]) +async def test_copy_table_across(dsn, faker, mode): + faker.choose_schema(ncols=20) + faker.make_records(20) + + connect = psycopg.AsyncConnection.connect + async with await connect(dsn) as conn1, await connect(dsn) as conn2: + faker.table_name = sql.Identifier("copy_src") + await conn1.execute(faker.drop_stmt) + await conn1.execute(faker.create_stmt) + await conn1.cursor().executemany(faker.insert_stmt, faker.records) + + faker.table_name = sql.Identifier("copy_tgt") + await conn2.execute(faker.drop_stmt) + await conn2.execute(faker.create_stmt) + + fmt = "(format binary)" if mode == "binary" else "" + async with conn1.cursor().copy(f"copy copy_src to stdout {fmt}") as copy1: + async with conn2.cursor().copy(f"copy copy_tgt from stdin {fmt}") as copy2: + if mode == "row": + async for row in copy1.rows(): + await copy2.write_row(row) + else: + async for data in copy1: + await copy2.write(data) + + cur = await conn2.execute(faker.select_stmt) + recs = await cur.fetchall() + for got, want in zip(recs, faker.records): + faker.assert_record(got, want) + + async def ensure_table(cur, tabledef, name="copy_in"): await cur.execute(f"drop table if exists {name}") await cur.execute(f"create table {name} ({tabledef})")