self.__connection = connection or engine.raw_connection()
self.__transaction = None
self.__close_with_result = close_with_result
+ def _get_connection(self):
+ try:
+ return self.__connection
+ except AttributeError:
+ raise exceptions.InvalidRequestError("This Connection is closed")
engine = property(lambda s:s.__engine, doc="The Engine with which this Connection is associated (read only)")
- connection = property(lambda s:s.__connection, doc="The underlying DBAPI connection managed by this Connection.")
+ connection = property(_get_connection, doc="The underlying DBAPI connection managed by this Connection.")
should_close_with_result = property(lambda s:s.__close_with_result, doc="Indicates if this Connection should be closed when a corresponding ResultProxy is closed; this is essentially an auto-release mode.")
def _create_transaction(self, parent):
return Transaction(self, parent)
def _begin_impl(self):
if self.__engine.echo:
self.__engine.log("BEGIN")
- self.__engine.dialect.do_begin(self.__connection)
+ self.__engine.dialect.do_begin(self.connection)
def _rollback_impl(self):
if self.__engine.echo:
self.__engine.log("ROLLBACK")
- self.__engine.dialect.do_rollback(self.__connection)
+ self.__engine.dialect.do_rollback(self.connection)
def _commit_impl(self):
if self.__engine.echo:
self.__engine.log("COMMIT")
- self.__engine.dialect.do_commit(self.__connection)
+ self.__engine.dialect.do_commit(self.connection)
def _autocommit(self, statement):
"""when no Transaction is present, this is called after executions to provide "autocommit" behavior."""
# TODO: have the dialect determine if autocommit can be set on the connection directly without this
if not self.in_transaction():
self._rollback_impl()
def close(self):
- if self.__connection is not None:
- self.__connection.close()
- self.__connection = None
+ try:
+ c = self.__connection
+ except AttributeError:
+ return
+ self.__connection.close()
+ self.__connection = None
+ del self.__connection
def scalar(self, object, parameters=None, **kwargs):
row = self.execute(object, parameters, **kwargs).fetchone()
if row is not None:
return self.execute_compiled(elem.compile(engine=self.__engine, parameters=param), *multiparams, **params)
def execute_compiled(self, compiled, *multiparams, **params):
"""executes a sql.Compiled object."""
- cursor = self.__connection.cursor()
+ cursor = self.connection.cursor()
parameters = [compiled.get_params(**m) for m in self._params_to_listofdicts(*multiparams, **params)]
if len(parameters) == 1:
parameters = parameters[0]
callable_(self)
def _execute_raw(self, statement, parameters=None, cursor=None, echo=None, context=None, **kwargs):
if cursor is None:
- cursor = self.__connection.cursor()
+ cursor = self.connection.cursor()
try:
if echo is True or self.__engine.echo is not False:
self.__engine.log(statement)
assert len(result.fetchall()) == 0
connection.close()
+ @testbase.unsupported('mysql')
def testnesting(self):
connection = testbase.db.connect()
transaction = connection.begin()
def tearDownAll(self):
metadata.drop_all(testbase.db)
-
+
+ @testbase.unsupported('sqlite')
def testrollback_deadlock(self):
"""test that returning connections to the pool clears any object locks."""
conn1 = testbase.db.connect()
finally:
external_connection.close()
+ @testbase.unsupported('mysql')
def testmixednesting(self):
"""tests nesting of transactions off the TLEngine directly inside of
tranasctions off the connection from the TLEngine"""
finally:
external_connection.close()
+ @testbase.unsupported('mysql')
def testmoremixednesting(self):
"""tests nesting of transactions off the connection from the TLEngine
inside of tranasctions off thbe TLEngine directly."""