from psycopg import sql
from psycopg import errors as e
from psycopg.pq import Format
-from psycopg.adapt import PyFormat as PgFormat
+from psycopg.adapt import PyFormat
from psycopg.types import TypeInfo
from psycopg.types.hstore import register_hstore
from psycopg.types.numeric import Int4
sample_binary = b"".join(sample_binary_rows)
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_out_read(conn, format):
if format == pq.Format.TEXT:
want = [row + b"\n" for row in sample_text.splitlines()]
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_out_iter(conn, format):
if format == pq.Format.TEXT:
want = [row + b"\n" for row in sample_text.splitlines()]
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
@pytest.mark.parametrize("typetype", ["names", "oids"])
def test_read_rows(conn, format, typetype):
cur = conn.cursor()
assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_rows(conn, format):
cur = conn.cursor()
with cur.copy(
assert rows == [({"a": "1", "b": "2"},)]
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_out_allchars(conn, format):
cur = conn.cursor()
chars = list(map(chr, range(1, 256))) + [eur]
assert rows == chars
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_read_row_notypes(conn, format):
cur = conn.cursor()
with cur.copy(
assert rows == ref
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_rows_notypes(conn, format):
cur = conn.cursor()
with cur.copy(
@pytest.mark.parametrize("err", [-1, 1])
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_out_badntypes(conn, format, err):
cur = conn.cursor()
with cur.copy(
assert conn.pgconn.transaction_status == conn.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_in_empty(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
assert cur.rowcount == 0
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_subclass_adapter(conn, format):
if format == Format.TEXT:
from psycopg.types.string import StrDumper as BaseDumper
assert rec[0] == "hellohello"
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_in_error_empty(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
assert conn.pgconn.transaction_status == conn.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_in_records(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
assert data == sample_records
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_in_records_set_types(conn, format):
cur = conn.cursor()
ensure_table(cur, sample_tabledef)
assert data == sample_records
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
def test_copy_in_records_binary(conn, format):
cur = conn.cursor()
ensure_table(cur, "col1 serial primary key, col2 int, data text")
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
def test_copy_to_leaks(dsn, faker, fmt, method, retries):
- faker.format = PgFormat.from_pq(fmt)
+ faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
def test_copy_from_leaks(dsn, faker, fmt, retries):
- faker.format = PgFormat.from_pq(fmt)
+ faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
from psycopg import errors as e
from psycopg.pq import Format
from psycopg.types import TypeInfo
-from psycopg.adapt import PyFormat as PgFormat
+from psycopg.adapt import PyFormat
from psycopg.types.hstore import register_hstore
from psycopg.types.numeric import Int4
pytestmark = pytest.mark.asyncio
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_out_read(aconn, format):
if format == pq.Format.TEXT:
want = [row + b"\n" for row in sample_text.splitlines()]
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_out_iter(aconn, format):
if format == pq.Format.TEXT:
want = [row + b"\n" for row in sample_text.splitlines()]
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
@pytest.mark.parametrize("typetype", ["names", "oids"])
async def test_read_rows(aconn, format, typetype):
cur = aconn.cursor()
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_rows(aconn, format):
cur = aconn.cursor()
async with cur.copy(
assert rows == [({"a": "1", "b": "2"},)]
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_out_allchars(aconn, format):
cur = aconn.cursor()
chars = list(map(chr, range(1, 256))) + [eur]
assert rows == chars
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_read_row_notypes(aconn, format):
cur = aconn.cursor()
async with cur.copy(
assert rows == ref
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_rows_notypes(aconn, format):
cur = aconn.cursor()
async with cur.copy(
@pytest.mark.parametrize("err", [-1, 1])
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_out_badntypes(aconn, format, err):
cur = aconn.cursor()
async with cur.copy(
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_in_empty(aconn, format):
cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
assert cur.rowcount == 0
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_subclass_adapter(aconn, format):
if format == Format.TEXT:
from psycopg.types.string import StrDumper as BaseDumper
assert rec[0] == "hellohello"
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_in_error_empty(aconn, format):
cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_in_records(aconn, format):
cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
assert data == sample_records
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_in_records_set_types(aconn, format):
cur = aconn.cursor()
await ensure_table(cur, sample_tabledef)
assert data == sample_records
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
async def test_copy_in_records_binary(aconn, format):
cur = aconn.cursor()
await ensure_table(cur, "col1 serial primary key, col2 int, data text")
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
@pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
async def test_copy_to_leaks(dsn, faker, fmt, method, retries):
- faker.format = PgFormat.from_pq(fmt)
+ faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
@pytest.mark.slow
@pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
async def test_copy_from_leaks(dsn, faker, fmt, retries):
- faker.format = PgFormat.from_pq(fmt)
+ faker.format = PyFormat.from_pq(fmt)
faker.choose_schema(ncols=20)
faker.make_records(20)
assert result == SAMPLE_POLYGON
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", Format)
def test_match_geojson(shapely_conn, fmt_out):
SAMPLE_POINT = Point(1.2, 3.4)
with shapely_conn.cursor(binary=fmt_out) as cur: