]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
refactor(tests): make transaction sync/async tests more uniform
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Tue, 15 Aug 2023 18:29:34 +0000 (19:29 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 11 Oct 2023 21:45:38 +0000 (23:45 +0200)
tests/_test_transaction.py [new file with mode: 0644]
tests/test_transaction.py
tests/test_transaction_async.py

diff --git a/tests/_test_transaction.py b/tests/_test_transaction.py
new file mode 100644 (file)
index 0000000..005e48c
--- /dev/null
@@ -0,0 +1,72 @@
+import sys
+import pytest
+import psycopg
+
+
+# TODOCRDB: is this the expected behaviour?
+crdb_skip_external_observer = pytest.mark.crdb(
+    "skip", reason="deadlock on observer connection"
+)
+
+
+@pytest.fixture(autouse=True)
+def create_test_table(svcconn):
+    """Creates a table called 'test_table' for use in tests."""
+    cur = svcconn.cursor()
+    cur.execute("drop table if exists test_table")
+    cur.execute("create table test_table (id text primary key)")
+    yield
+    cur.execute("drop table test_table")
+
+
+def insert_row(conn, value):
+    sql = "INSERT INTO test_table VALUES (%s)"
+    if isinstance(conn, psycopg.Connection):
+        conn.cursor().execute(sql, (value,))
+    else:
+
+        async def f():
+            cur = conn.cursor()
+            await cur.execute(sql, (value,))
+
+        return f()
+
+
+def inserted(conn):
+    """Return the values inserted in the test table."""
+    sql = "SELECT * FROM test_table"
+    if isinstance(conn, psycopg.Connection):
+        rows = conn.cursor().execute(sql).fetchall()
+        return set(v for (v,) in rows)
+    else:
+
+        async def f():
+            cur = conn.cursor()
+            await cur.execute(sql)
+            rows = await cur.fetchall()
+            return set(v for (v,) in rows)
+
+        return f()
+
+
+def in_transaction(conn):
+    if conn.pgconn.transaction_status == conn.TransactionStatus.IDLE:
+        return False
+    elif conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS:
+        return True
+    else:
+        assert False, conn.pgconn.transaction_status
+
+
+def get_exc_info(exc):
+    """Return the exc info for an exception or a success if exc is None"""
+    if not exc:
+        return (None,) * 3
+    try:
+        raise exc
+    except exc:
+        return sys.exc_info()
+
+
+class ExpectedException(Exception):
+    pass
index 9391e00cf2317973f085e5b6eee7915f7e2ffb3c..d1fdcae614f0da4b0d74a93b273c2f060e37f6e8 100644 (file)
@@ -1,17 +1,14 @@
-import sys
 import logging
 from threading import Thread, Event
 
 import pytest
 
-import psycopg
 from psycopg import Rollback
 from psycopg import errors as e
 
-# TODOCRDB: is this the expected behaviour?
-crdb_skip_external_observer = pytest.mark.crdb(
-    "skip", reason="deadlock on observer connection"
-)
+from ._test_transaction import in_transaction, insert_row, inserted, get_exc_info
+from ._test_transaction import ExpectedException, crdb_skip_external_observer
+from ._test_transaction import create_test_table  # noqa  # autouse fixture
 
 
 @pytest.fixture
@@ -19,69 +16,6 @@ def conn(conn, pipeline):
     return conn
 
 
-@pytest.fixture(autouse=True)
-def create_test_table(svcconn):
-    """Creates a table called 'test_table' for use in tests."""
-    cur = svcconn.cursor()
-    cur.execute("drop table if exists test_table")
-    cur.execute("create table test_table (id text primary key)")
-    yield
-    cur.execute("drop table test_table")
-
-
-def insert_row(conn, value):
-    sql = "INSERT INTO test_table VALUES (%s)"
-    if isinstance(conn, psycopg.Connection):
-        conn.cursor().execute(sql, (value,))
-    else:
-
-        async def f():
-            cur = conn.cursor()
-            await cur.execute(sql, (value,))
-
-        return f()
-
-
-def inserted(conn):
-    """Return the values inserted in the test table."""
-    sql = "SELECT * FROM test_table"
-    if isinstance(conn, psycopg.Connection):
-        rows = conn.cursor().execute(sql).fetchall()
-        return set(v for (v,) in rows)
-    else:
-
-        async def f():
-            cur = conn.cursor()
-            await cur.execute(sql)
-            rows = await cur.fetchall()
-            return set(v for (v,) in rows)
-
-        return f()
-
-
-def in_transaction(conn):
-    if conn.pgconn.transaction_status == conn.TransactionStatus.IDLE:
-        return False
-    elif conn.pgconn.transaction_status == conn.TransactionStatus.INTRANS:
-        return True
-    else:
-        assert False, conn.pgconn.transaction_status
-
-
-def get_exc_info(exc):
-    """Return the exc info for an exception or a success if exc is None"""
-    if not exc:
-        return (None,) * 3
-    try:
-        raise exc
-    except exc:
-        return sys.exc_info()
-
-
-class ExpectedException(Exception):
-    pass
-
-
 def test_basic(conn, pipeline):
     """Basic use of transaction() to BEGIN and COMMIT a transaction."""
     assert not in_transaction(conn)
index abe5bf06315051e11d3b1e019b3718d064ced806..c72aa8cfe580442a15cf5ddd6e2bc4ec3a105b4b 100644 (file)
@@ -6,9 +6,9 @@ import pytest
 from psycopg import Rollback
 from psycopg import errors as e
 
-from .test_transaction import in_transaction, insert_row, inserted, get_exc_info
-from .test_transaction import ExpectedException, crdb_skip_external_observer
-from .test_transaction import create_test_table  # noqa  # autouse fixture
+from ._test_transaction import in_transaction, insert_row, inserted, get_exc_info
+from ._test_transaction import ExpectedException, crdb_skip_external_observer
+from ._test_transaction import create_test_table  # noqa  # autouse fixture
 
 
 @pytest.fixture