]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
test: add tests to verify copy across tables works
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 29 Apr 2022 21:39:36 +0000 (23:39 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 29 Apr 2022 21:41:10 +0000 (23:41 +0200)
tests/test_copy.py
tests/test_copy_async.py

index 52e3e968d8f06dcb0bcafab68adb8cc4791602bf..2d2826d793eb5ff8bd75483d4b00c39c06a65337 100644 (file)
@@ -703,6 +703,37 @@ def test_copy_from_leaks(dsn, faker, fmt, set_types):
     assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
+@pytest.mark.slow
+@pytest.mark.parametrize("mode", ["row", "block", "binary"])
+def test_copy_table_across(dsn, faker, mode):
+    faker.choose_schema(ncols=20)
+    faker.make_records(20)
+
+    with psycopg.connect(dsn) as conn1, psycopg.connect(dsn) as conn2:
+        faker.table_name = sql.Identifier("copy_src")
+        conn1.execute(faker.drop_stmt)
+        conn1.execute(faker.create_stmt)
+        conn1.cursor().executemany(faker.insert_stmt, faker.records)
+
+        faker.table_name = sql.Identifier("copy_tgt")
+        conn2.execute(faker.drop_stmt)
+        conn2.execute(faker.create_stmt)
+
+        fmt = "(format binary)" if mode == "binary" else ""
+        with conn1.cursor().copy(f"copy copy_src to stdout {fmt}") as copy1:
+            with conn2.cursor().copy(f"copy copy_tgt from stdin {fmt}") as copy2:
+                if mode == "row":
+                    for row in copy1.rows():
+                        copy2.write_row(row)
+                else:
+                    for data in copy1:
+                        copy2.write(data)
+
+        recs = conn2.execute(faker.select_stmt).fetchall()
+        for got, want in zip(recs, faker.records):
+            faker.assert_record(got, want)
+
+
 def py_to_raw(item, fmt):
     """Convert from Python type to the expected result from the db"""
     if fmt == Format.TEXT:
index 6fc33b1c20b29645a1946cd2acfbbdc24539e5ca..0c0683da84015930888e19e3411c8e2add3c48b3 100644 (file)
@@ -703,6 +703,39 @@ async def test_copy_from_leaks(dsn, faker, fmt, set_types):
     assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
 
 
+@pytest.mark.slow
+@pytest.mark.parametrize("mode", ["row", "block", "binary"])
+async def test_copy_table_across(dsn, faker, mode):
+    faker.choose_schema(ncols=20)
+    faker.make_records(20)
+
+    connect = psycopg.AsyncConnection.connect
+    async with await connect(dsn) as conn1, await connect(dsn) as conn2:
+        faker.table_name = sql.Identifier("copy_src")
+        await conn1.execute(faker.drop_stmt)
+        await conn1.execute(faker.create_stmt)
+        await conn1.cursor().executemany(faker.insert_stmt, faker.records)
+
+        faker.table_name = sql.Identifier("copy_tgt")
+        await conn2.execute(faker.drop_stmt)
+        await conn2.execute(faker.create_stmt)
+
+        fmt = "(format binary)" if mode == "binary" else ""
+        async with conn1.cursor().copy(f"copy copy_src to stdout {fmt}") as copy1:
+            async with conn2.cursor().copy(f"copy copy_tgt from stdin {fmt}") as copy2:
+                if mode == "row":
+                    async for row in copy1.rows():
+                        await copy2.write_row(row)
+                else:
+                    async for data in copy1:
+                        await copy2.write(data)
+
+        cur = await conn2.execute(faker.select_stmt)
+        recs = await cur.fetchall()
+        for got, want in zip(recs, faker.records):
+            faker.assert_record(got, want)
+
+
 async def ensure_table(cur, tabledef, name="copy_in"):
     await cur.execute(f"drop table if exists {name}")
     await cur.execute(f"create table {name} ({tabledef})")