]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Use natural iteration over pq.Format in parametrised tests
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 23 Sep 2021 14:30:16 +0000 (16:30 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 26 Sep 2021 19:51:39 +0000 (21:51 +0200)
tests/test_copy.py
tests/test_copy_async.py
tests/types/test_shapely.py

index 0d70140ebef8f8731fd0cbcf7ea1f9111184e511..20439e32cd4a9a85321355fc2ad90f7c08ec9b1b 100644 (file)
@@ -11,7 +11,7 @@ from psycopg import pq
 from psycopg import sql
 from psycopg import errors as e
 from psycopg.pq import Format
-from psycopg.adapt import PyFormat as PgFormat
+from psycopg.adapt import PyFormat
 from psycopg.types import TypeInfo
 from psycopg.types.hstore import register_hstore
 from psycopg.types.numeric import Int4
@@ -48,7 +48,7 @@ sample_binary_rows = [
 sample_binary = b"".join(sample_binary_rows)
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_out_read(conn, format):
     if format == pq.Format.TEXT:
         want = [row + b"\n" for row in sample_text.splitlines()]
@@ -73,7 +73,7 @@ def test_copy_out_read(conn, format):
     assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_out_iter(conn, format):
     if format == pq.Format.TEXT:
         want = [row + b"\n" for row in sample_text.splitlines()]
@@ -89,7 +89,7 @@ def test_copy_out_iter(conn, format):
     assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 @pytest.mark.parametrize("typetype", ["names", "oids"])
 def test_read_rows(conn, format, typetype):
     cur = conn.cursor()
@@ -106,7 +106,7 @@ def test_read_rows(conn, format, typetype):
     assert conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_rows(conn, format):
     cur = conn.cursor()
     with cur.copy(
@@ -136,7 +136,7 @@ def test_set_custom_type(conn, hstore):
     assert rows == [({"a": "1", "b": "2"},)]
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_out_allchars(conn, format):
     cur = conn.cursor()
     chars = list(map(chr, range(1, 256))) + [eur]
@@ -157,7 +157,7 @@ def test_copy_out_allchars(conn, format):
     assert rows == chars
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_read_row_notypes(conn, format):
     cur = conn.cursor()
     with cur.copy(
@@ -177,7 +177,7 @@ def test_read_row_notypes(conn, format):
     assert rows == ref
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_rows_notypes(conn, format):
     cur = conn.cursor()
     with cur.copy(
@@ -192,7 +192,7 @@ def test_rows_notypes(conn, format):
 
 
 @pytest.mark.parametrize("err", [-1, 1])
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_out_badntypes(conn, format, err):
     cur = conn.cursor()
     with cur.copy(
@@ -273,7 +273,7 @@ def test_copy_in_str_binary(conn):
     assert conn.pgconn.transaction_status == conn.TransactionStatus.INERROR
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_in_empty(conn, format):
     cur = conn.cursor()
     ensure_table(cur, sample_tabledef)
@@ -284,7 +284,7 @@ def test_copy_in_empty(conn, format):
     assert cur.rowcount == 0
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_subclass_adapter(conn, format):
     if format == Format.TEXT:
         from psycopg.types.string import StrDumper as BaseDumper
@@ -309,7 +309,7 @@ def test_subclass_adapter(conn, format):
     assert rec[0] == "hellohello"
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_in_error_empty(conn, format):
     cur = conn.cursor()
     ensure_table(cur, sample_tabledef)
@@ -344,7 +344,7 @@ def test_copy_in_buffers_with_py_error(conn):
     assert conn.pgconn.transaction_status == conn.TransactionStatus.INERROR
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_in_records(conn, format):
     cur = conn.cursor()
     ensure_table(cur, sample_tabledef)
@@ -359,7 +359,7 @@ def test_copy_in_records(conn, format):
     assert data == sample_records
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_in_records_set_types(conn, format):
     cur = conn.cursor()
     ensure_table(cur, sample_tabledef)
@@ -373,7 +373,7 @@ def test_copy_in_records_set_types(conn, format):
     assert data == sample_records
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 def test_copy_in_records_binary(conn, format):
     cur = conn.cursor()
     ensure_table(cur, "col1 serial primary key, col2 int, data text")
@@ -540,7 +540,7 @@ def test_worker_life(conn, format, buffer):
 @pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
 @pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
 def test_copy_to_leaks(dsn, faker, fmt, method, retries):
-    faker.format = PgFormat.from_pq(fmt)
+    faker.format = PyFormat.from_pq(fmt)
     faker.choose_schema(ncols=20)
     faker.make_records(20)
 
@@ -599,7 +599,7 @@ def test_copy_to_leaks(dsn, faker, fmt, method, retries):
 @pytest.mark.slow
 @pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
 def test_copy_from_leaks(dsn, faker, fmt, retries):
-    faker.format = PgFormat.from_pq(fmt)
+    faker.format = PyFormat.from_pq(fmt)
     faker.choose_schema(ncols=20)
     faker.make_records(20)
 
index 6322f2289f2eb3d5a7b72efe97d55791c3977bc9..182fb7365ca52d35fd9a21d159cb006407d6a560 100644 (file)
@@ -12,7 +12,7 @@ from psycopg import sql
 from psycopg import errors as e
 from psycopg.pq import Format
 from psycopg.types import TypeInfo
-from psycopg.adapt import PyFormat as PgFormat
+from psycopg.adapt import PyFormat
 from psycopg.types.hstore import register_hstore
 from psycopg.types.numeric import Int4
 
@@ -24,7 +24,7 @@ from .test_copy import py_to_raw
 pytestmark = pytest.mark.asyncio
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_out_read(aconn, format):
     if format == pq.Format.TEXT:
         want = [row + b"\n" for row in sample_text.splitlines()]
@@ -50,7 +50,7 @@ async def test_copy_out_read(aconn, format):
     assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_out_iter(aconn, format):
     if format == pq.Format.TEXT:
         want = [row + b"\n" for row in sample_text.splitlines()]
@@ -66,7 +66,7 @@ async def test_copy_out_iter(aconn, format):
     assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 @pytest.mark.parametrize("typetype", ["names", "oids"])
 async def test_read_rows(aconn, format, typetype):
     cur = aconn.cursor()
@@ -83,7 +83,7 @@ async def test_read_rows(aconn, format, typetype):
     assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INTRANS
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_rows(aconn, format):
     cur = aconn.cursor()
     async with cur.copy(
@@ -113,7 +113,7 @@ async def test_set_custom_type(aconn, hstore):
     assert rows == [({"a": "1", "b": "2"},)]
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_out_allchars(aconn, format):
     cur = aconn.cursor()
     chars = list(map(chr, range(1, 256))) + [eur]
@@ -134,7 +134,7 @@ async def test_copy_out_allchars(aconn, format):
     assert rows == chars
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_read_row_notypes(aconn, format):
     cur = aconn.cursor()
     async with cur.copy(
@@ -154,7 +154,7 @@ async def test_read_row_notypes(aconn, format):
     assert rows == ref
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_rows_notypes(aconn, format):
     cur = aconn.cursor()
     async with cur.copy(
@@ -169,7 +169,7 @@ async def test_rows_notypes(aconn, format):
 
 
 @pytest.mark.parametrize("err", [-1, 1])
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_out_badntypes(aconn, format, err):
     cur = aconn.cursor()
     async with cur.copy(
@@ -254,7 +254,7 @@ async def test_copy_in_str_binary(aconn):
     assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_in_empty(aconn, format):
     cur = aconn.cursor()
     await ensure_table(cur, sample_tabledef)
@@ -265,7 +265,7 @@ async def test_copy_in_empty(aconn, format):
     assert cur.rowcount == 0
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_subclass_adapter(aconn, format):
     if format == Format.TEXT:
         from psycopg.types.string import StrDumper as BaseDumper
@@ -291,7 +291,7 @@ async def test_subclass_adapter(aconn, format):
     assert rec[0] == "hellohello"
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_in_error_empty(aconn, format):
     cur = aconn.cursor()
     await ensure_table(cur, sample_tabledef)
@@ -326,7 +326,7 @@ async def test_copy_in_buffers_with_py_error(aconn):
     assert aconn.pgconn.transaction_status == aconn.TransactionStatus.INERROR
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_in_records(aconn, format):
     cur = aconn.cursor()
     await ensure_table(cur, sample_tabledef)
@@ -344,7 +344,7 @@ async def test_copy_in_records(aconn, format):
     assert data == sample_records
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_in_records_set_types(aconn, format):
     cur = aconn.cursor()
     await ensure_table(cur, sample_tabledef)
@@ -361,7 +361,7 @@ async def test_copy_in_records_set_types(aconn, format):
     assert data == sample_records
 
 
-@pytest.mark.parametrize("format", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("format", Format)
 async def test_copy_in_records_binary(aconn, format):
     cur = aconn.cursor()
     await ensure_table(cur, "col1 serial primary key, col2 int, data text")
@@ -533,7 +533,7 @@ async def test_worker_life(aconn, format, buffer):
 @pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
 @pytest.mark.parametrize("method", ["read", "iter", "row", "rows"])
 async def test_copy_to_leaks(dsn, faker, fmt, method, retries):
-    faker.format = PgFormat.from_pq(fmt)
+    faker.format = PyFormat.from_pq(fmt)
     faker.choose_schema(ncols=20)
     faker.make_records(20)
 
@@ -592,7 +592,7 @@ async def test_copy_to_leaks(dsn, faker, fmt, method, retries):
 @pytest.mark.slow
 @pytest.mark.parametrize("fmt", [Format.TEXT, Format.BINARY])
 async def test_copy_from_leaks(dsn, faker, fmt, retries):
-    faker.format = PgFormat.from_pq(fmt)
+    faker.format = PyFormat.from_pq(fmt)
     faker.choose_schema(ncols=20)
     faker.make_records(20)
 
index 327fa9c5ddf858b0effbad3b1a3772cb0cdda06f..f9d4aa50a0143f3003caeda62201b2998169f3b6 100644 (file)
@@ -136,7 +136,7 @@ def test_write_read_shape(shapely_conn, fmt_in, fmt_out):
         assert result == SAMPLE_POLYGON
 
 
-@pytest.mark.parametrize("fmt_out", [Format.TEXT, Format.BINARY])
+@pytest.mark.parametrize("fmt_out", Format)
 def test_match_geojson(shapely_conn, fmt_out):
     SAMPLE_POINT = Point(1.2, 3.4)
     with shapely_conn.cursor(binary=fmt_out) as cur: