The values passed to the method will be available on the returned
object as the members `~Xid.format_id`, `~Xid.gtrid`, `~Xid.bqual`.
"""
+ self._check_tpc()
return Xid.from_parts(format_id, gtrid, bqual)
def _tpc_begin_gen(self, xid: Union[Xid, str]) -> PQGen[None]:
+ self._check_tpc()
+
if not isinstance(xid, Xid):
xid = Xid.from_string(xid)
)
self._tpc = None
+ def _check_tpc(self) -> None:
+ """Raise NotSupportedError if TPC is not supported."""
+ # TPC supported on every supported PostgreSQL version.
+ pass
+
class Connection(BaseConnection[Row]):
"""
self.wait(self._tpc_finish_gen("ROLLBACK", xid))
def tpc_recover(self) -> List[Xid]:
+ self._check_tpc()
status = self.info.transaction_status
with self.cursor(row_factory=args_row(Xid._from_record)) as cur:
cur.execute(Xid._get_recover_query())
await self.wait(self._tpc_finish_gen("rollback", xid))
async def tpc_recover(self) -> List[Xid]:
+ self._check_tpc()
status = self.info.transaction_status
async with self.cursor(row_factory=args_row(Xid._from_record)) as cur:
await cur.execute(Xid._get_recover_query())
def info(self) -> "CrdbConnectionInfo":
return CrdbConnectionInfo(self.pgconn)
+ def _check_tpc(self) -> None:
+ if self.is_crdb(self.pgconn):
+ raise e.NotSupportedError("CockroachDB doesn't support prepared statements")
+
class CrdbConnection(_CrdbConnectionMixin, Connection[Row]):
"""
import psycopg.crdb
+from psycopg import errors as e
from psycopg.crdb import CrdbConnection
import pytest
def test_connect(dsn):
- with psycopg.crdb.CrdbConnection.connect(dsn) as conn:
+ with CrdbConnection.connect(dsn) as conn:
assert isinstance(conn, CrdbConnection)
with psycopg.crdb.connect(dsn) as conn:
assert isinstance(conn, CrdbConnection)
+
+
+def test_xid(dsn):
+ with CrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ conn.xid(1, "gtrid", "bqual")
+
+
+def test_tpc_begin(dsn):
+ with CrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ conn.tpc_begin("foo")
+
+
+def test_tpc_recover(dsn):
+ with CrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ conn.tpc_recover()
import psycopg.crdb
+from psycopg import errors as e
from psycopg.crdb import AsyncCrdbConnection
import pytest
async def test_connect(dsn):
- async with await psycopg.crdb.AsyncCrdbConnection.connect(dsn) as conn:
+ async with await AsyncCrdbConnection.connect(dsn) as conn:
assert isinstance(conn, psycopg.crdb.AsyncCrdbConnection)
+
+
+async def test_xid(dsn):
+ async with await AsyncCrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ conn.xid(1, "gtrid", "bqual")
+
+
+async def test_tpc_begin(dsn):
+ async with await AsyncCrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ await conn.tpc_begin("foo")
+
+
+async def test_tpc_recover(dsn):
+ async with await AsyncCrdbConnection.connect(dsn) as conn:
+ with pytest.raises(e.NotSupportedError):
+ await conn.tpc_recover()
+from psycopg.pq import TransactionStatus
from psycopg.crdb import CrdbConnection
import pytest
def test_is_crdb(conn):
assert not CrdbConnection.is_crdb(conn)
assert not CrdbConnection.is_crdb(conn.pgconn)
+
+
+def test_tpc_on_pg_connection(conn, tpc):
+ xid = conn.xid(1, "gtrid", "bqual")
+ assert conn.info.transaction_status == TransactionStatus.IDLE
+
+ conn.tpc_begin(xid)
+ assert conn.info.transaction_status == TransactionStatus.INTRANS
+
+ cur = conn.cursor()
+ cur.execute("insert into test_tpc values ('test_tpc_commit')")
+ assert tpc.count_xacts() == 0
+ assert tpc.count_test_records() == 0
+
+ conn.tpc_prepare()
+ assert conn.info.transaction_status == TransactionStatus.IDLE
+ assert tpc.count_xacts() == 1
+ assert tpc.count_test_records() == 0
+
+ conn.tpc_commit()
+ assert conn.info.transaction_status == TransactionStatus.IDLE
+ assert tpc.count_xacts() == 0
+ assert tpc.count_test_records() == 1