assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+@pytest.mark.slow
+@pytest.mark.parametrize("mode", ["row", "block", "binary"])
+def test_copy_table_across(dsn, faker, mode):
+ faker.choose_schema(ncols=20)
+ faker.make_records(20)
+
+ with psycopg.connect(dsn) as conn1, psycopg.connect(dsn) as conn2:
+ faker.table_name = sql.Identifier("copy_src")
+ conn1.execute(faker.drop_stmt)
+ conn1.execute(faker.create_stmt)
+ conn1.cursor().executemany(faker.insert_stmt, faker.records)
+
+ faker.table_name = sql.Identifier("copy_tgt")
+ conn2.execute(faker.drop_stmt)
+ conn2.execute(faker.create_stmt)
+
+ fmt = "(format binary)" if mode == "binary" else ""
+ with conn1.cursor().copy(f"copy copy_src to stdout {fmt}") as copy1:
+ with conn2.cursor().copy(f"copy copy_tgt from stdin {fmt}") as copy2:
+ if mode == "row":
+ for row in copy1.rows():
+ copy2.write_row(row)
+ else:
+ for data in copy1:
+ copy2.write(data)
+
+ recs = conn2.execute(faker.select_stmt).fetchall()
+ for got, want in zip(recs, faker.records):
+ faker.assert_record(got, want)
+
+
def py_to_raw(item, fmt):
"""Convert from Python type to the expected result from the db"""
if fmt == Format.TEXT:
assert n[0] == n[1] == n[2], f"objects leaked: {n[1] - n[0]}, {n[2] - n[1]}"
+@pytest.mark.slow
+@pytest.mark.parametrize("mode", ["row", "block", "binary"])
+async def test_copy_table_across(dsn, faker, mode):
+ faker.choose_schema(ncols=20)
+ faker.make_records(20)
+
+ connect = psycopg.AsyncConnection.connect
+ async with await connect(dsn) as conn1, await connect(dsn) as conn2:
+ faker.table_name = sql.Identifier("copy_src")
+ await conn1.execute(faker.drop_stmt)
+ await conn1.execute(faker.create_stmt)
+ await conn1.cursor().executemany(faker.insert_stmt, faker.records)
+
+ faker.table_name = sql.Identifier("copy_tgt")
+ await conn2.execute(faker.drop_stmt)
+ await conn2.execute(faker.create_stmt)
+
+ fmt = "(format binary)" if mode == "binary" else ""
+ async with conn1.cursor().copy(f"copy copy_src to stdout {fmt}") as copy1:
+ async with conn2.cursor().copy(f"copy copy_tgt from stdin {fmt}") as copy2:
+ if mode == "row":
+ async for row in copy1.rows():
+ await copy2.write_row(row)
+ else:
+ async for data in copy1:
+ await copy2.write(data)
+
+ cur = await conn2.execute(faker.select_stmt)
+ recs = await cur.fetchall()
+ for got, want in zip(recs, faker.records):
+ faker.assert_record(got, want)
+
+
async def ensure_table(cur, tabledef, name="copy_in"):
await cur.execute(f"drop table if exists {name}")
await cur.execute(f"create table {name} ({tabledef})")