Importing it across test cases requires too silly workarounds.
Also do without attrgetter.
adapters.types.clear()
for t in types:
adapters.types.add(t)
+
+
+@pytest.fixture
+def tpc(svcconn):
+ tpc = Tpc(svcconn)
+ tpc.check_tpc()
+ tpc.clear_test_xacts()
+ tpc.make_test_table()
+ yield tpc
+ tpc.clear_test_xacts()
+
+
+class Tpc:
+ """Helper object to test two-phase transactions"""
+
+ def __init__(self, conn):
+ assert conn.autocommit
+ self.conn = conn
+
+ def check_tpc(self):
+ val = int(
+ self.conn.execute("show max_prepared_transactions").fetchone()[0]
+ )
+ if not val:
+ pytest.skip("prepared transactions disabled in the database")
+
+ def clear_test_xacts(self):
+ """Rollback all the prepared transaction in the testing db."""
+ from psycopg import sql
+
+ cur = self.conn.execute(
+ "select gid from pg_prepared_xacts where database = %s",
+ (self.conn.info.dbname,),
+ )
+ gids = [r[0] for r in cur]
+ for gid in gids:
+ self.conn.execute(sql.SQL("rollback prepared {}").format(gid))
+
+ def make_test_table(self):
+ self.conn.execute("CREATE TABLE IF NOT EXISTS test_tpc (data text)")
+ self.conn.execute("TRUNCATE test_tpc")
+
+ def count_xacts(self):
+ """Return the number of prepared xacts currently in the test db."""
+ cur = self.conn.execute(
+ """
+ select count(*) from pg_prepared_xacts
+ where database = %s""",
+ (self.conn.info.dbname,),
+ )
+ return cur.fetchone()[0]
+
+ def count_test_records(self):
+ """Return the number of records in the test table."""
+ cur = self.conn.execute("select count(*) from test_tpc")
+ return cur.fetchone()[0]
from . import dbapi20
from . import dbapi20_tpc
-from .test_tpc import tpc # noqa F401 # fixture
-
@pytest.fixture(scope="class")
def with_dsn(request, dsn):
-from operator import attrgetter
-
import pytest
import psycopg
-from psycopg import sql
def test_tpc_disabled(conn):
xids = conn.tpc_recover()
xids = [xid for xid in xids if xid.database == conn.info.dbname]
- xids.sort(key=attrgetter("gtrid"))
+ xids.sort(key=lambda x: x.gtrid)
# check the values returned
assert len(okvals) == len(xids)
x2 = psycopg.Xid.from_string("99_xxx_yyy")
str(x2) == "99_xxx_yyy"
-
-
-@pytest.fixture
-def tpc(svcconn):
- tpc = Tpc(svcconn)
- tpc.check_tpc()
- tpc.clear_test_xacts()
- tpc.make_test_table()
- yield tpc
- tpc.clear_test_xacts()
-
-
-class Tpc:
- """Helper object to test two-phase transactions"""
-
- def __init__(self, conn):
- assert conn.autocommit
- self.conn = conn
-
- def check_tpc(self):
- val = int(
- self.conn.execute("show max_prepared_transactions").fetchone()[0]
- )
- if not val:
- pytest.skip("prepared transactions disabled in the database")
-
- def clear_test_xacts(self):
- """Rollback all the prepared transaction in the testing db."""
- cur = self.conn.execute(
- "select gid from pg_prepared_xacts where database = %s",
- (self.conn.info.dbname,),
- )
- gids = [r[0] for r in cur]
- for gid in gids:
- self.conn.execute(sql.SQL("rollback prepared {}").format(gid))
-
- def make_test_table(self):
- self.conn.execute("CREATE TABLE IF NOT EXISTS test_tpc (data text)")
- self.conn.execute("TRUNCATE test_tpc")
-
- def count_xacts(self):
- """Return the number of prepared xacts currently in the test db."""
- cur = self.conn.execute(
- """
- select count(*) from pg_prepared_xacts
- where database = %s""",
- (self.conn.info.dbname,),
- )
- return cur.fetchone()[0]
-
- def count_test_records(self):
- """Return the number of records in the test table."""
- cur = self.conn.execute("select count(*) from test_tpc")
- return cur.fetchone()[0]
-from operator import attrgetter
-
import pytest
import psycopg
-from .test_tpc import tpc # noqa: F401 # fixture
-
pytestmark = [pytest.mark.asyncio]
-tpc = tpc # Silence F811 in the rest of the file
async def test_tpc_disabled(aconn):
xids = await aconn.tpc_recover()
xids = [xid for xid in xids if xid.database == aconn.info.dbname]
- xids.sort(key=attrgetter("gtrid"))
+ xids.sort(key=lambda x: x.gtrid)
# check the values returned
assert len(okvals) == len(xids)