+# WARNING: this file is auto-generated by 'async_to_sync.py'
+# from the original file 'test_copy_async.py'
+# DO NOT CHANGE! Change the original file instead.
import string
import hashlib
from io import BytesIO, StringIO
def test_read_rows(conn, format, typetype):
cur = conn.cursor()
with cur.copy(
- f"""copy (
- select 10::int4, 'hello'::text, '{{0.0,1.0}}'::float8[]
- ) to stdout (format {format.name})"""
+ """copy (
+ select 10::int4, 'hello'::text, '{0.0,1.0}'::float8[]
+ ) to stdout (format %s)"""
+ % format.name
) as copy:
copy.set_types(["int4", "text", "float8[]"])
row = copy.read_row()
break
rows.append(row)
- ref = [tuple(py_to_raw(i, format) for i in record) for record in sample_records]
+ ref = [tuple((py_to_raw(i, format) for i in record)) for record in sample_records]
assert rows == ref
cur = conn.cursor()
with cur.copy(f"copy ({sample_values}) to stdout (format {format.name})") as copy:
rows = list(copy.rows())
- ref = [tuple(py_to_raw(i, format) for i in record) for record in sample_records]
+ ref = [tuple((py_to_raw(i, format) for i in record)) for record in sample_records]
assert rows == ref
@pytest.mark.parametrize(
- "format, buffer",
- [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")],
+ "format, buffer", [(Format.TEXT, "sample_text"), (Format.BINARY, "sample_binary")]
)
def test_copy_in_buffers(conn, format, buffer):
cur = conn.cursor()
with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy:
copy.write(globals()[buffer])
- data = cur.execute("select * from copy_in order by 1").fetchall()
+ cur.execute("select * from copy_in order by 1")
+ data = cur.fetchall()
assert data == sample_records
with cur.copy("copy copy_in from stdin (format text)") as copy:
copy.write(sample_text.decode())
- data = cur.execute("select * from copy_in order by 1").fetchall()
+ cur.execute("select * from copy_in order by 1")
+ data = cur.fetchall()
assert data == sample_records
def test_copy_big_size_record(conn):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
- data = "".join(chr(randrange(1, 256)) for i in range(10 * 1024 * 1024))
+ data = "".join((chr(randrange(1, 256)) for i in range(10 * 1024 * 1024)))
with cur.copy("copy copy_in (data) from stdin") as copy:
copy.write_row([data])
cur.execute("select data from copy_in limit 1")
- assert cur.fetchone()[0] == data
+ assert cur.fetchone() == (data,)
@pytest.mark.slow
def test_copy_big_size_block(conn, pytype):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
- data = "".join(choice(string.ascii_letters) for i in range(10 * 1024 * 1024))
+ data = "".join((choice(string.ascii_letters) for i in range(10 * 1024 * 1024)))
copy_data = data + "\n" if pytype is str else pytype(data.encode() + b"\n")
with cur.copy("copy copy_in (data) from stdin") as copy:
copy.write(copy_data)
cur.execute("select data from copy_in limit 1")
- assert cur.fetchone()[0] == data
+ assert cur.fetchone() == (data,)
@pytest.mark.parametrize("format", Format)
with cur.copy(f"copy copy_in (data) from stdin (format {format.name})") as copy:
copy.write_row(("hello",))
- rec = cur.execute("select data from copy_in").fetchone()
+ cur.execute("select data from copy_in")
+ rec = cur.fetchone()
assert rec[0] == "hellohello"
with cur.copy(f"copy copy_in from stdin (format {format.name})") as copy:
for row in sample_records:
if format == Format.BINARY:
- row2 = tuple(Int4(i) if isinstance(i, int) else i for i in row)
+ row2 = tuple((Int4(i) if isinstance(i, int) else i for i in row))
row = row2 # type: ignore[assignment]
copy.write_row(row)
- data = cur.execute("select * from copy_in order by 1").fetchall()
+ cur.execute("select * from copy_in order by 1")
+ data = cur.fetchall()
assert data == sample_records
for row in sample_records:
copy.write_row(row)
- data = cur.execute("select * from copy_in order by 1").fetchall()
+ cur.execute("select * from copy_in order by 1")
+ data = cur.fetchall()
assert data == sample_records
for row in sample_records:
copy.write_row((None, row[2]))
- data = cur.execute("select * from copy_in order by 1").fetchall()
+ cur.execute("select * from copy_in order by 1")
+ data = cur.fetchall()
assert data == [(1, None, "hello"), (2, None, "world")]
copy.write_row((i, None, chr(i)))
copy.write_row((ord(eur), None, eur))
- data = cur.execute(
+ cur.execute(
"""
select col1 = ascii(data), col2 is null, length(data), count(*)
from copy_in group by 1, 2, 3
"""
- ).fetchall()
+ )
+ data = cur.fetchall()
assert data == [(True, True, 1, 256)]
assert copy.writer._worker
assert not copy.writer._worker
- data = cur.execute("select * from copy_in order by 1").fetchall()
+ cur.execute("select * from copy_in order by 1")
+ data = cur.fetchall()
assert data == sample_records
assert copy.writer is writer
copy.write(globals()[buffer])
- data = cur.execute("select * from copy_in order by 1").fetchall()
+ cur.execute("select * from copy_in order by 1")
+ data = cur.fetchall()
assert data == sample_records
@pytest.mark.slow
@pytest.mark.parametrize(
- "fmt, set_types",
- [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
+ "fmt, set_types", [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)]
)
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
def test_copy_to_leaks(conn_cls, dsn, faker, fmt, set_types, method):
@pytest.mark.slow
@pytest.mark.parametrize(
- "fmt, set_types",
- [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)],
+ "fmt, set_types", [(Format.TEXT, True), (Format.TEXT, False), (Format.BINARY, True)]
)
def test_copy_from_leaks(conn_cls, dsn, faker, fmt, set_types):
faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
- with conn_cls.connect(dsn) as conn1, conn_cls.connect(dsn) as conn2:
+ connect = conn_cls.connect
+ with connect(dsn) as conn1, connect(dsn) as conn2:
faker.table_name = sql.Identifier("copy_src")
conn1.execute(faker.drop_stmt)
conn1.execute(faker.create_stmt)
for data in copy1:
copy2.write(data)
- recs = conn2.execute(faker.select_stmt).fetchall()
+ cur = conn2.execute(faker.select_stmt)
+ recs = cur.fetchall()
for got, want in zip(recs, faker.records):
faker.assert_record(got, want)