--- /dev/null
+.. change::
+ :tags: postgresql, bug
+ :tickets: 13229
+
+ Improve handling of two phase transaction identifiers for PostgreSQL
+ when the identifier is provided by the user.
+ As part of this change the psycopg dialect was updated to use the DBAPI
+ two phase transaction API instead of executing the SQL directly.
dbapi_connection.autocommit = before_autocommit
return True
+
+ def do_begin_twophase(self, connection, xid):
+ connection.connection.tpc_begin(xid)
+
+ def do_prepare_twophase(self, connection, xid):
+ connection.connection.tpc_prepare()
+
+ def _do_twophase(self, dbapi_conn, operation, xid, recover=False):
+ if recover:
+ if not self._twophase_idle_check(dbapi_conn):
+ dbapi_conn.rollback()
+ operation(xid)
+ else:
+ operation()
+
+ def _twophase_idle_check(self, dbapi_conn):
+ raise NotImplementedError
+
+ def do_rollback_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ dbapi_conn = connection.connection.dbapi_connection
+ self._do_twophase(
+ dbapi_conn, dbapi_conn.tpc_rollback, xid, recover=recover
+ )
+
+ def do_commit_twophase(
+ self, connection, xid, is_prepared=True, recover=False
+ ):
+ dbapi_conn = connection.connection.dbapi_connection
+ self._do_twophase(
+ dbapi_conn, dbapi_conn.tpc_commit, xid, recover=recover
+ )
+
+ def do_recover_twophase(self, connection):
+ return [row[1] for row in connection.connection.tpc_recover()]
self.do_begin(connection.connection)
def do_prepare_twophase(self, connection, xid):
- connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
+ connection.execute(
+ sql.text("PREPARE TRANSACTION :xid'").bindparams(
+ sql.bindparam("xid", xid, literal_execute=True)
+ )
+ )
def do_rollback_twophase(
self, connection, xid, is_prepared=True, recover=False
# Must find out a way how to make the dbapi not
# open a transaction.
connection.exec_driver_sql("ROLLBACK")
- connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
+ connection.execute(
+ sql.text("ROLLBACK PREPARED :xid").bindparams(
+ sql.bindparam("xid", xid, literal_execute=True)
+ )
+ )
connection.exec_driver_sql("BEGIN")
self.do_rollback(connection.connection)
else:
if is_prepared:
if recover:
connection.exec_driver_sql("ROLLBACK")
- connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
+ connection.execute(
+ sql.text("COMMIT PREPARED :xid").bindparams(
+ sql.bindparam("xid", xid, literal_execute=True)
+ )
+ )
connection.exec_driver_sql("BEGIN")
self.do_rollback(connection.connection)
else:
def drop_all_schema_objects_pre_tables(cfg, eng):
with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
for xid in conn.exec_driver_sql(
- "select gid from pg_prepared_xacts"
+ "SELECT gid FROM pg_prepared_xacts "
+ "WHERE database = current_database()"
).scalars():
- conn.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
+ eng.dialect.do_rollback_twophase(conn, xid, recover=True)
@drop_all_schema_objects_post_tables.for_db("postgresql")
return True
return False
- def _do_prepared_twophase(self, connection, command, recover=False):
- dbapi_conn = connection.connection.dbapi_connection
- if (
- recover
- # don't rely on psycopg providing enum symbols, compare with
- # eq/ne
- or dbapi_conn.info.transaction_status
- != self._psycopg_TransactionStatus.IDLE
- ):
- dbapi_conn.rollback()
- before_autocommit = dbapi_conn.autocommit
- try:
- if not before_autocommit:
- self._do_autocommit(dbapi_conn, True)
- with dbapi_conn.cursor() as cursor:
- cursor.execute(command)
- finally:
- if not before_autocommit:
- self._do_autocommit(dbapi_conn, before_autocommit)
-
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if is_prepared:
- self._do_prepared_twophase(
- connection, f"ROLLBACK PREPARED '{xid}'", recover=recover
- )
- else:
- self.do_rollback(connection.connection)
-
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- if is_prepared:
- self._do_prepared_twophase(
- connection, f"COMMIT PREPARED '{xid}'", recover=recover
- )
- else:
- self.do_commit(connection.connection)
+ def _twophase_idle_check(self, dbapi_conn):
+ # don't rely on psycopg providing enum symbols, compare with eq/ne
+ return (
+ dbapi_conn.info.transaction_status
+ == self._psycopg_TransactionStatus.IDLE
+ )
@util.memoized_property
def _dialect_specific_select_one(self):
else:
return AsyncAdapt_psycopg_cursor(self)
+ def tpc_begin(self, xid):
+ return await_(self._connection.tpc_begin(xid))
+
+ def tpc_prepare(self):
+ return await_(self._connection.tpc_prepare())
+
+ def tpc_commit(self, xid=None):
+ return await_(self._connection.tpc_commit(xid))
+
+ def tpc_rollback(self, xid=None):
+ return await_(self._connection.tpc_rollback(xid))
+
+ def tpc_recover(self):
+ return await_(self._connection.tpc_recover())
+
class PsycopgAdaptDBAPI(AsyncAdapt_dbapi_module):
def __init__(self, psycopg, ExecStatus) -> None:
else:
cursor.executemany(statement, parameters)
- def do_begin_twophase(self, connection, xid):
- connection.connection.tpc_begin(xid)
-
- def do_prepare_twophase(self, connection, xid):
- connection.connection.tpc_prepare()
-
- def _do_twophase(self, dbapi_conn, operation, xid, recover=False):
- if recover:
- if dbapi_conn.status != self._psycopg2_extensions.STATUS_READY:
- dbapi_conn.rollback()
- operation(xid)
- else:
- operation()
-
- def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- dbapi_conn = connection.connection.dbapi_connection
- self._do_twophase(
- dbapi_conn, dbapi_conn.tpc_rollback, xid, recover=recover
- )
-
- def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
- dbapi_conn = connection.connection.dbapi_connection
- self._do_twophase(
- dbapi_conn, dbapi_conn.tpc_commit, xid, recover=recover
- )
+ def _twophase_idle_check(self, dbapi_conn):
+ return dbapi_conn.status == self._psycopg2_extensions.STATUS_READY
@util.memoized_instancemethod
def _hstore_oids(self, dbapi_connection):
:meth:`~.TwoPhaseTransaction.prepare` method.
:param xid: the two phase transaction id. If not supplied, a
- random id will be generated.
+ random id will be generated. The accepted type and value depends on
+ the driver in use.
.. seealso::
"hoho",
)
eng.update_execution_options(foo="hoho")
- conn = eng.connect()
- eq_(conn._execution_options["foo"], "hoho")
+ conn2 = eng.connect()
+ eq_(conn2._execution_options["foo"], "hoho")
+ conn2.close()
def test_generative_engine_execution_options(self):
eng = engines.testing_engine(
"commit_twophase",
],
)
+ conn.close()
class HandleErrorTest(fixtures.TestBase):