- PG two phase was calling text() without the correct bind param format, previous compiler checkin revealed issue
self.do_begin(connection.connection)
def do_prepare_twophase(self, connection, xid):
- connection.execute(sql.text("PREPARE TRANSACTION %(tid)s", bindparams=[sql.bindparam('tid', xid)]))
+ connection.execute(sql.text("PREPARE TRANSACTION :tid", bindparams=[sql.bindparam('tid', xid)]))
def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False):
if is_prepared:
#FIXME: ugly hack to get out of transaction context when commiting recoverable transactions
# Must find out a way how to make the dbapi not open a transaction.
connection.execute(sql.text("ROLLBACK"))
- connection.execute(sql.text("ROLLBACK PREPARED %(tid)s", bindparams=[sql.bindparam('tid', xid)]))
+ connection.execute(sql.text("ROLLBACK PREPARED :tid", bindparams=[sql.bindparam('tid', xid)]))
connection.execute(sql.text("BEGIN"))
self.do_rollback(connection.connection)
else:
if is_prepared:
if recover:
connection.execute(sql.text("ROLLBACK"))
- connection.execute(sql.text("COMMIT PREPARED %(tid)s", bindparams=[sql.bindparam('tid', xid)]))
+ connection.execute(sql.text("COMMIT PREPARED :tid", bindparams=[sql.bindparam('tid', xid)]))
connection.execute(sql.text("BEGIN"))
self.do_rollback(connection.connection)
else:
return self.__transaction is not None
def _begin_impl(self):
- if self.__connection.is_valid:
- if self.__engine._should_log_info:
- self.__engine.logger.info("BEGIN")
- try:
- self.__engine.dialect.do_begin(self.connection)
- except Exception, e:
- raise exceptions.DBAPIError.instance(None, None, e)
+ if self.__engine._should_log_info:
+ self.__engine.logger.info("BEGIN")
+ try:
+ self.__engine.dialect.do_begin(self.__connection)
+ except Exception, e:
+ raise exceptions.DBAPIError.instance(None, None, e)
def _rollback_impl(self):
if self.__connection.is_valid:
if self.__engine._should_log_info:
self.__engine.logger.info("ROLLBACK")
try:
- self.__engine.dialect.do_rollback(self.connection)
+ self.__engine.dialect.do_rollback(self.__connection)
except Exception, e:
raise exceptions.DBAPIError.instance(None, None, e)
self.__transaction = None
def _commit_impl(self):
- if self.__connection.is_valid:
- if self.__engine._should_log_info:
- self.__engine.logger.info("COMMIT")
- try:
- self.__engine.dialect.do_commit(self.connection)
- except Exception, e:
- raise exceptions.DBAPIError.instance(None, None, e)
+ if self.__engine._should_log_info:
+ self.__engine.logger.info("COMMIT")
+ try:
+ self.__engine.dialect.do_commit(self.__connection)
+ except Exception, e:
+ raise exceptions.DBAPIError.instance(None, None, e)
self.__transaction = None
def _savepoint_impl(self, name=None):
self.dialect = context.dialect
self.closed = False
self.cursor = context.cursor
+ self.connection = context.root_connection
self.__echo = context.engine._should_log_info
if context.is_select():
self._init_metadata()
self._rowcount = context.get_rowcount()
self.close()
- connection = property(lambda self:self.context.root_connection)
-
def _get_rowcount(self):
if self._rowcount is not None:
return self._rowcount
class DefaultExecutionContext(base.ExecutionContext):
def __init__(self, dialect, connection, compiled=None, statement=None, parameters=None):
self.dialect = dialect
- self._connection = connection
+ self._connection = self.root_connection = connection
self.compiled = compiled
self._postfetch_cols = util.Set()
self.engine = connection.engine
connection = property(lambda s:s._connection._branch())
- root_connection = property(lambda s:s._connection)
-
def __encode_param_keys(self, params):
"""apply string encoding to the keys of dictionary-based bind parameters.
_logger = property(lambda self: self._pool.logger)
is_valid = property(lambda self:self.connection is not None)
-
+
def _get_properties(self):
"""A property collection unique to this DB-API connection."""
@testing.exclude('mysql', '<', (5, 0, 3))
def testmixedtwophasetransaction(self):
connection = testbase.db.connect()
-
+
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name='user1')
-
+
transaction2 = connection.begin()
connection.execute(users.insert(), user_id=2, user_name='user2')
-
+
transaction3 = connection.begin_nested()
connection.execute(users.insert(), user_id=3, user_name='user3')
-
+
transaction4 = connection.begin()
connection.execute(users.insert(), user_id=4, user_name='user4')
transaction4.commit()
-
+
transaction3.rollback()
-
+
connection.execute(users.insert(), user_id=5, user_name='user5')
-
+
transaction2.commit()
-
+
transaction.prepare()
-
+
transaction.commit()
-
+
self.assertEquals(
connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
[(1,),(2,),(5,)]
)
connection.close()
-
+
@testing.supported('postgres')
def testtwophaserecover(self):
# MySQL recovery doesn't currently seem to work correctly