Support for reflection of the "name" of primary key
constraints added, courtesy Dave Moore.
+ .. change::
+ :tags: informix
+
+ Some cruft regarding informix transaction handling has been
+ removed, including a feature that would skip calling
+ commit()/rollback() as well as some hardcoded isolation level
+ assumptions on begin().. The status of this dialect is not
+ well understood as we don't have any users working with it,
+ nor any access to an Informix database. If someone with
+ access to Informix wants to help test this dialect, please
+ let us know.
+
+ .. change::
+ :tags: pool, feature
+
+ The :class:`.Pool` will now log all connection.close()
+ operations equally, including closes which occur for
+ invalidated connections, detached connections, and connections
+ beyond the pool capacity.
+
+ .. change::
+ :tags: pool, feature
+ :tickets: 2611
+
+ The :class:`.Pool` now consults the :class:`.Dialect` for
+ functionality regarding how the connection should be
+ "auto rolled back", as well as closed. This grants more
+ control of transaction scope to the dialect, so that we
+ will be better able to implement transactional workarounds
+ like those potentially needed for pysqlite and cx_oracle.
+
+ .. change::
+ :tags: pool, feature
+
+ Added new :meth:`.PoolEvents.reset` hook to capture
+ the event before a connection is auto-rolled back, upon
+ return to the pool. Together with
+ :meth:`.ConnectionEvents.rollback` this allows all rollback
+ events to be intercepted.
+
.. changelog::
:version: 0.8.0b1
:released: October 30, 2012
conn.autocommit(False)
return connect
- def do_commit(self, connection):
- """Execute a COMMIT."""
-
- connection.commit()
-
- def do_rollback(self, connection):
- """Execute a ROLLBACK."""
-
- connection.rollback()
-
@reflection.cache
def get_table_names(self, connection, schema=None, **kw):
"""Return a Unicode SHOW TABLES from a given schema."""
# when there are no arguments.
cursor.execute(statement, parameters or [])
- def do_rollback(self, connection):
+ def do_rollback(self, dbapi_connection):
# Use the retaining feature, that keeps the transaction going
- connection.rollback(True)
+ dbapi_connection.rollback(True)
- def do_commit(self, connection):
+ def do_commit(self, dbapi_connection):
# Use the retaining feature, that keeps the transaction going
- connection.commit(True)
+ dbapi_connection.commit(True)
preparer = InformixIdentifierPreparer
default_paramstyle = 'qmark'
- def __init__(self, has_transactions=True, *args, **kwargs):
- self.has_transactions = has_transactions
- default.DefaultDialect.__init__(self, *args, **kwargs)
-
def initialize(self, connection):
super(InformixDialect, self).initialize(connection)
else:
self.max_identifier_length = 128
- def do_begin(self, connection):
- cu = connection.cursor()
- cu.execute('SET LOCK MODE TO WAIT')
- if self.has_transactions:
- cu.execute('SET ISOLATION TO REPEATABLE READ')
-
- def do_commit(self, connection):
- if self.has_transactions:
- connection.commit()
-
- def do_rollback(self, connection):
- if self.has_transactions:
- connection.rollback()
-
def _get_table_names(self, connection, schema, type, **kw):
schema = schema or self.default_schema_name
s = "select tabname, owner from systables where owner=? and tabtype=?"
cursor.close()
return val.upper().replace("-", " ")
- def do_commit(self, connection):
+ def do_commit(self, dbapi_connection):
"""Execute a COMMIT."""
# COMMIT/ROLLBACK were introduced in 3.23.15.
# Ignore commit/rollback if support isn't present, otherwise even basic
# operations via autocommit fail.
try:
- connection.commit()
+ dbapi_connection.commit()
except:
if self.server_version_info < (3, 23, 15):
args = sys.exc_info()[1].args
return
raise
- def do_rollback(self, connection):
+ def do_rollback(self, dbapi_connection):
"""Execute a ROLLBACK."""
try:
- connection.rollback()
+ dbapi_connection.rollback()
except:
if self.server_version_info < (3, 23, 15):
args = sys.exc_info()[1].args
else:
self.__transaction = None
- def _commit_impl(self):
+ def _commit_impl(self, autocommit=False):
if self._has_events:
self.dispatch.commit(self)
result.close(_autoclose_connection=False)
if self.__transaction is None and context.should_autocommit:
- self._commit_impl()
+ self._commit_impl(autocommit=True)
if result.closed and self.should_close_with_result:
self.close()
self.pool = pool
self.url = url
self.dialect = dialect
+ self.pool._dialect = dialect
if logging_name:
self.logging_name = logging_name
self.echo = echo
opts.update(url.query)
return [[], opts]
- def do_begin(self, connection):
- """Implementations might want to put logic here for turning
- autocommit on/off, etc.
- """
-
+ def do_begin(self, dbapi_connection):
pass
- def do_rollback(self, connection):
- """Implementations might want to put logic here for turning
- autocommit on/off, etc.
- """
+ def do_rollback(self, dbapi_connection):
+ dbapi_connection.rollback()
- connection.rollback()
-
- def do_commit(self, connection):
- """Implementations might want to put logic here for turning
- autocommit on/off, etc.
- """
+ def do_commit(self, dbapi_connection):
+ dbapi_connection.commit()
- connection.commit()
+ def do_close(self, dbapi_connection):
+ dbapi_connection.close()
def create_xid(self):
"""Create a random two-phase transaction ID.
raise NotImplementedError()
- def do_begin(self, connection):
- """Provide an implementation of *connection.begin()*, given a
- DB-API connection."""
+ def do_begin(self, dbapi_connection):
+ """Provide an implementation of ``connection.begin()``, given a
+ DB-API connection.
+
+ The DBAPI has no dedicated "begin" method and it is expected
+ that transactions are implicit. This hook is provided for those
+ DBAPIs that might need additional help in this area.
+
+ Note that :meth:`.Dialect.do_begin` is not called unless a
+ :class:`.Transaction` object is in use. The
+ :meth:`.Dialect.do_autocommit`
+ hook is provided for DBAPIs that need some extra commands emitted
+ after a commit in order to enter the next transaction, when the
+ SQLAlchemy :class:`.Connection` is used in it's default "autocommit"
+ mode.
+
+ :param dbapi_connection: a DBAPI connection, typically
+ proxied within a :class:`.ConnectionFairy`.
+
+ """
raise NotImplementedError()
- def do_rollback(self, connection):
- """Provide an implementation of *connection.rollback()*, given
- a DB-API connection."""
+ def do_rollback(self, dbapi_connection):
+ """Provide an implementation of ``connection.rollback()``, given
+ a DB-API connection.
+
+ :param dbapi_connection: a DBAPI connection, typically
+ proxied within a :class:`.ConnectionFairy`.
+
+ """
+
+ raise NotImplementedError()
+
+
+ def do_commit(self, dbapi_connection):
+ """Provide an implementation of ``connection.commit()``, given a
+ DB-API connection.
+
+ :param dbapi_connection: a DBAPI connection, typically
+ proxied within a :class:`.ConnectionFairy`.
+
+ """
+
+ raise NotImplementedError()
+
+ def do_close(self, dbapi_connection):
+ """Provide an implementation of ``connection.close()``, given a DBAPI
+ connection.
+
+ This hook is called by the :class:`.Pool` when a connection has been
+ detached from the pool, or is being returned beyond the normal
+ capacity of the pool.
+
+ .. versionadded:: 0.8
+
+ """
raise NotImplementedError()
raise NotImplementedError()
- def do_commit(self, connection):
- """Provide an implementation of *connection.commit()*, given a
- DB-API connection."""
+ def do_savepoint(self, connection, name):
+ """Create a savepoint with the given name.
- raise NotImplementedError()
+ :param connection: a :class:`.Connection`.
+ :param name: savepoint name.
- def do_savepoint(self, connection, name):
- """Create a savepoint with the given name on a SQLAlchemy
- connection."""
+ """
raise NotImplementedError()
def do_rollback_to_savepoint(self, connection, name):
- """Rollback a SQL Alchemy connection to the named savepoint."""
+ """Rollback a connection to the named savepoint.
+
+ :param connection: a :class:`.Connection`.
+ :param name: savepoint name.
+
+ """
raise NotImplementedError()
def do_release_savepoint(self, connection, name):
- """Release the named savepoint on a SQL Alchemy connection."""
+ """Release the named savepoint on a connection.
+
+ :param connection: a :class:`.Connection`.
+ :param name: savepoint name.
+ """
raise NotImplementedError()
def do_begin_twophase(self, connection, xid):
- """Begin a two phase transaction on the given connection."""
+ """Begin a two phase transaction on the given connection.
+
+ :param connection: a :class:`.Connection`.
+ :param xid: xid
+
+ """
raise NotImplementedError()
def do_prepare_twophase(self, connection, xid):
- """Prepare a two phase transaction on the given connection."""
+ """Prepare a two phase transaction on the given connection.
+
+ :param connection: a :class:`.Connection`.
+ :param xid: xid
+
+ """
raise NotImplementedError()
def do_rollback_twophase(self, connection, xid, is_prepared=True,
recover=False):
- """Rollback a two phase transaction on the given connection."""
+ """Rollback a two phase transaction on the given connection.
+
+ :param connection: a :class:`.Connection`.
+ :param xid: xid
+ :param is_prepared: whether or not
+ :meth:`.TwoPhaseTransaction.prepare` was called.
+ :param recover: if the recover flag was passed.
+
+ """
raise NotImplementedError()
def do_commit_twophase(self, connection, xid, is_prepared=True,
recover=False):
- """Commit a two phase transaction on the given connection."""
+ """Commit a two phase transaction on the given connection.
+
+
+ :param connection: a :class:`.Connection`.
+ :param xid: xid
+ :param is_prepared: whether or not
+ :meth:`.TwoPhaseTransaction.prepare` was called.
+ :param recover: if the recover flag was passed.
+
+ """
raise NotImplementedError()
def do_recover_twophase(self, connection):
"""Recover list of uncommited prepared two phase transaction
- identifiers on the given connection."""
+ identifiers on the given connection.
+
+ :param connection: a :class:`.Connection`.
+
+ """
raise NotImplementedError()
"""
+ def reset(self, dbapi_con, con_record):
+ """Called before the "reset" action occurs for a pooled connection.
+
+ This event represents
+ when the ``rollback()`` method is called on the DBAPI connection
+ before it is returned to the pool. The behavior of "reset" can
+ be controlled, including disabled, using the ``reset_on_return``
+ pool argument.
+
+
+ The :meth:`.PoolEvents.reset` event is usually followed by the
+ the :meth:`.PoolEvents.checkin` event is called, except in those
+ cases where the connection is discarded immediately after reset.
+
+ :param dbapi_con:
+ A raw DB-API connection
+
+ :param con_record:
+ The ``_ConnectionRecord`` that persistently manages the connection
+
+ .. versionadded:: 0.8
+
+ .. seealso::
+
+ :meth:`.ConnectionEvents.rollback`
+
+ :meth:`.ConnectionEvents.commit`
+
+ """
+
+
class ConnectionEvents(event.Events):
"""Available events for :class:`.Connectable`, which includes
"""
def rollback(self, conn):
- """Intercept rollback() events.
+ """Intercept rollback() events, as initiated by a
+ :class:`.Transaction`.
+
+ Note that the :class:`.Pool` also "auto-rolls back"
+ a DBAPI connection upon checkin, if the ``reset_on_return``
+ flag is set to its default value of ``'rollback'``.
+ To intercept this
+ rollback, use the :meth:`.PoolEvents.reset` hook.
:param conn: :class:`.Connection` object
+ .. seealso::
+
+ :meth:`.PoolEvents.reset`
+
"""
def commit(self, conn):
- """Intercept commit() events.
+ """Intercept commit() events, as initiated by a
+ :class:`.Transaction`.
+
+ Note that the :class:`.Pool` may also "auto-commit"
+ a DBAPI connection upon checkin, if the ``reset_on_return``
+ flag is set to the value ``'commit'``. To intercept this
+ commit, use the :meth:`.PoolEvents.reset` hook.
:param conn: :class:`.Connection` object
"""
reset_commit = util.symbol('reset_commit')
reset_none = util.symbol('reset_none')
+class _ConnDialect(object):
+ """partial implementation of :class:`.Dialect`
+ which provides DBAPI connection methods.
+
+ When a :class:`.Pool` is combined with an :class:`.Engine`,
+ the :class:`.Engine` replaces this with its own
+ :class:`.Dialect`.
+
+ """
+ def do_rollback(self, dbapi_connection):
+ dbapi_connection.rollback()
+
+ def do_commit(self, dbapi_connection):
+ dbapi_connection.commit()
+
+ def do_close(self, dbapi_connection):
+ dbapi_connection.close()
class Pool(log.Identified):
"""Abstract base class for connection pools."""
+ _dialect = _ConnDialect()
+
def __init__(self,
creator, recycle=-1, echo=None,
use_threadlocal=False,
reset_on_return=True,
listeners=None,
events=None,
- _dispatch=None):
+ _dispatch=None,
+ _dialect=None):
"""
Construct a Pool.
self.echo = echo
if _dispatch:
self.dispatch._update(_dispatch, only_propagate=False)
+ if _dialect:
+ self._dialect = _dialect
if events:
for fn, target in events:
event.listen(self, target, fn)
dispatch = event.dispatcher(events.PoolEvents)
+ def _close_connection(self, connection):
+ self.logger.debug("Closing connection %r", connection)
+ try:
+ self._dialect.do_close(connection)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except:
+ self.logger.debug("Exception closing connection %r",
+ connection)
+
@util.deprecated(
2.7, "Pool.add_listener is deprecated. Use event.listen()")
def add_listener(self, listener):
def close(self):
if self.connection is not None:
- self.__pool.logger.debug("Closing connection %r", self.connection)
- try:
- self.connection.close()
- except (SystemExit, KeyboardInterrupt):
- raise
- except:
- self.__pool.logger.debug("Exception closing connection %r",
- self.connection)
+ self.__pool._close_connection(self.connection)
def invalidate(self, e=None):
if e is not None:
return self.connection
def __close(self):
- try:
- self.__pool.logger.debug("Closing connection %r", self.connection)
- self.connection.close()
- except (SystemExit, KeyboardInterrupt):
- raise
- except Exception, e:
- self.__pool.logger.debug(
- "Connection %r threw an error on close: %s",
- self.connection, e)
+ self.__pool._close_connection(self.connection)
def __connect(self):
try:
if connection is not None:
try:
+ if pool.dispatch.reset:
+ pool.dispatch.reset(connection, connection_record)
if pool._reset_on_return is reset_rollback:
- connection.rollback()
+ pool._dialect.do_rollback(connection)
elif pool._reset_on_return is reset_commit:
- connection.commit()
+ pool._dialect.do_commit(connection)
# Immediately close detached instances
if connection_record is None:
- connection.close()
+ pool._close_connection(connection)
except Exception, e:
if connection_record is not None:
connection_record.invalidate(e=e)
echo=self.echo,
logging_name=self._orig_logging_name,
use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def dispose(self):
"""Dispose of this pool."""
recycle=self._recycle, echo=self.echo,
logging_name=self._orig_logging_name,
use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def dispose(self):
while True:
echo=self.echo,
logging_name=self._orig_logging_name,
use_threadlocal=self._use_threadlocal,
- _dispatch=self.dispatch)
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def dispose(self):
pass
reset_on_return=self._reset_on_return,
echo=self.echo,
logging_name=self._orig_logging_name,
- _dispatch=self.dispatch)
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def _create_connection(self):
return self._conn
self.logger.info("Pool recreating")
return self.__class__(self._creator, echo=self.echo,
logging_name=self._orig_logging_name,
- _dispatch=self.dispatch)
+ _dispatch=self.dispatch,
+ _dialect=self._dialect)
def _do_get(self):
if self._checked_out:
@classmethod
def teardown_class(cls):
- pool.clear_managers()
+ pool.clear_managers()
def _queuepool_fixture(self, **kw):
- dbapi = MockDBAPI()
- return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+ dbapi, pool = self._queuepool_dbapi_fixture(**kw)
+ return pool
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
- return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
+ return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
+ **kw)
class PoolTest(PoolTestBase):
def test_manager(self):
assert c1 is not c2
assert c1 is c3
+
+
+
def test_bad_args(self):
manager = pool.manage(MockDBAPI())
connection = manager.connect(None)
for row in cursor:
eq_(row, expected.pop(0))
+
def test_no_connect_on_recreate(self):
def creator():
raise Exception("no creates allowed")
self.assert_('foo2' in c.info)
+class PoolDialectTest(PoolTestBase):
+ def _dialect(self):
+ canary = []
+ class PoolDialect(object):
+ def do_rollback(self, dbapi_connection):
+ canary.append('R')
+ dbapi_connection.rollback()
+
+ def do_commit(self, dbapi_connection):
+ canary.append('C')
+ dbapi_connection.commit()
+
+ def do_close(self, dbapi_connection):
+ canary.append('CL')
+ dbapi_connection.close()
+ return PoolDialect(), canary
+
+ def _do_test(self, pool_cls, assertion):
+ mock_dbapi = MockDBAPI()
+ dialect, canary = self._dialect()
+
+ p = pool_cls(creator=mock_dbapi.connect)
+ p._dialect = dialect
+ conn = p.connect()
+ conn.close()
+ p.dispose()
+ p.recreate()
+ conn = p.connect()
+ conn.close()
+ eq_(canary, assertion)
+
+ def test_queue_pool(self):
+ self._do_test(pool.QueuePool, ['R', 'CL', 'R'])
+
+ def test_assertion_pool(self):
+ self._do_test(pool.AssertionPool, ['R', 'CL', 'R'])
+
+ def test_singleton_pool(self):
+ self._do_test(pool.SingletonThreadPool, ['R', 'CL', 'R'])
+
+ def test_null_pool(self):
+ self._do_test(pool.NullPool, ['R', 'CL', 'R', 'CL'])
-class PoolEventsTest(object): #PoolTestBase):
+ def test_static_pool(self):
+ self._do_test(pool.StaticPool, ['R', 'R'])
+
+
+class PoolEventsTest(PoolTestBase):
def _first_connect_event_fixture(self):
p = self._queuepool_fixture()
canary = []
return p, canary
+ def _reset_event_fixture(self):
+ p = self._queuepool_fixture()
+ canary = []
+ def reset(*arg, **kw):
+ canary.append('reset')
+ event.listen(p, 'reset', reset)
+
+ return p, canary
+
def test_first_connect_event(self):
p, canary = self._first_connect_event_fixture()
c1.close()
eq_(canary, ['checkin'])
+ def test_reset_event(self):
+ p, canary = self._reset_event_fixture()
+
+ c1 = p.connect()
+ eq_(canary, [])
+ c1.close()
+ eq_(canary, ['reset'])
+
def test_checkin_event_gc(self):
p, canary = self._checkin_event_fixture()
# assertions are raised if the counts do not match.
#
# To add a new callcount test, apply the function_call_count
-# decorator and re-run the tests using the --write-profiles option -
-# this file will be rewritten including the new count.
+# decorator and re-run the tests using the --write-profiles
+# option - this file will be rewritten including the new count.
#
# TEST: test.aaa_profiling.test_compiler.CompileTest.test_insert
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.6_sqlite_pysqlite_nocextensions 1174
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_mysql_mysqldb_cextensions 1341
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_mysql_mysqldb_nocextensions 1366
-test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_postgresql_psycopg2_cextensions 1200
+test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_postgresql_psycopg2_cextensions 1275
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_postgresql_psycopg2_nocextensions 1225
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_sqlite_pysqlite_cextensions 1149
test.aaa_profiling.test_orm.MergeTest.test_merge_load 2.7_sqlite_pysqlite_nocextensions 1174
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.5_sqlite_pysqlite_nocextensions 14413
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.6_sqlite_pysqlite_nocextensions 14414
-test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_cextensions 452
+test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_cextensions 476
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_mysql_mysqldb_nocextensions 14472
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_cextensions 20438
test.aaa_profiling.test_resultset.ResultSetTest.test_string 2.7_postgresql_psycopg2_nocextensions 34458
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.5_sqlite_pysqlite_nocextensions 14413
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.6_sqlite_pysqlite_nocextensions 14414
-test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_cextensions 452
+test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_cextensions 476
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_mysql_mysqldb_nocextensions 44472
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_cextensions 20438
test.aaa_profiling.test_resultset.ResultSetTest.test_unicode 2.7_postgresql_psycopg2_nocextensions 34458