]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- refactor pool a bit so that intent between ConnectionRecord/ConnectionFairy is...
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 2 Jul 2013 17:14:21 +0000 (13:14 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 2 Jul 2013 17:14:21 +0000 (13:14 -0400)
make sure that the DBAPI connection passed to the reset-on-return events/dialect hooks
is also a "fairy", so that dictionaries like "info" are available.  [ticket:2770]
- rework the execution_options system so that the dialect is given the job of making
any immediate adjustments based on a set event.  move the "isolation level" logic to use
this new system.   Also work things out so that even engine-level execution options
can be used for things like isolation level; the dialect attaches a connect-event
handler in this case to handle the task.
- to support this new system as well as further extensibiltiy of execution options
add events engine_connect(), set_connection_execution_options(), set_engine_execution_options()

doc/build/changelog/changelog_09.rst
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/engine/default.py
lib/sqlalchemy/engine/strategies.py
lib/sqlalchemy/events.py
lib/sqlalchemy/pool.py
test/engine/test_execute.py
test/engine/test_pool.py
test/engine/test_transaction.py
test/profiles.txt

index 65e05d89b9f8e6cd4547fc488c2923ca53fde11a..989dc433de37e50ac7029b830a3dfb989df1e641 100644 (file)
@@ -6,6 +6,16 @@
 .. changelog::
     :version: 0.9.0
 
+    .. change::
+        :tags: feature, engine
+        :tickets: 2770
+
+        New events added to :class:`.ConnectionEvents`:
+
+        * :meth:`.ConnectionEvents.engine_connect`
+        * :meth:`.ConnectionEvents.set_connection_execution_options`
+        * :meth:`.ConnectionEvents.set_engine_execution_options`
+
     .. change::
         :tags: feature, firebird
         :tickets: 2763
index 2d9f3af94ec47b4254cfa9040734b0205718f57d..f69bd3d4b80120bdd10dd077985d819ddb63bfa7 100644 (file)
@@ -46,7 +46,7 @@ class Connection(Connectable):
     def __init__(self, engine, connection=None, close_with_result=False,
                  _branch=False, _execution_options=None,
                  _dispatch=None,
-                 _has_events=False):
+                 _has_events=None):
         """Construct a new Connection.
 
         The constructor here is not public and is only called only by an
@@ -67,7 +67,8 @@ class Connection(Connectable):
             self.dispatch = _dispatch
         elif engine._has_events:
             self.dispatch = self.dispatch._join(engine.dispatch)
-        self._has_events = _has_events or engine._has_events
+        self._has_events = _has_events or (
+                                _has_events is None and engine._has_events)
 
         self._echo = self.engine._should_log_info()
         if _execution_options:
@@ -76,6 +77,9 @@ class Connection(Connectable):
         else:
             self._execution_options = engine._execution_options
 
+        if self._has_events:
+            self.dispatch.engine_connect(self, _branch)
+
     def _branch(self):
         """Return a new Connection which references this Connection's
         engine and connection; but does not have close_with_result enabled,
@@ -200,16 +204,11 @@ class Connection(Connectable):
         """
         c = self._clone()
         c._execution_options = c._execution_options.union(opt)
-        if 'isolation_level' in opt:
-            c._set_isolation_level()
+        if self._has_events:
+            self.dispatch.set_connection_execution_options(c, opt)
+        self.dialect.set_connection_execution_options(c, opt)
         return c
 
-    def _set_isolation_level(self):
-        self.dialect.set_isolation_level(self.connection,
-                                self._execution_options['isolation_level'])
-        self.connection._connection_record.finalize_callback = \
-                    self.dialect.reset_isolation_level
-
     @property
     def closed(self):
         """Return True if this connection is closed."""
@@ -1336,15 +1335,10 @@ class Engine(Connectable, log.Identified):
             :meth:`.Engine.execution_options`
 
         """
-        if 'isolation_level' in opt:
-            raise exc.ArgumentError(
-                "'isolation_level' execution option may "
-                "only be specified on Connection.execution_options(). "
-                "To set engine-wide isolation level, "
-                "use the isolation_level argument to create_engine()."
-            )
         self._execution_options = \
                 self._execution_options.union(opt)
+        self.dispatch.set_engine_execution_options(self, opt)
+        self.dialect.set_engine_execution_options(self, opt)
 
     def execution_options(self, **opt):
         """Return a new :class:`.Engine` that will provide
index 2ad7002c44461ab7f27cd1e5e59a66bf60771299..3e8e96a42c7a82b215cf0fc771ef102e46b03f57 100644 (file)
@@ -19,6 +19,7 @@ from ..sql import compiler, expression
 from .. import exc, types as sqltypes, util, pool, processors
 import codecs
 import weakref
+from .. import event
 
 AUTOCOMMIT_REGEXP = re.compile(
             r'\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER)',
@@ -289,6 +290,24 @@ class DefaultDialect(interfaces.Dialect):
         opts.update(url.query)
         return [[], opts]
 
+    def set_engine_execution_options(self, engine, opts):
+        if 'isolation_level' in opts:
+            isolation_level = opts['isolation_level']
+            @event.listens_for(engine, "engine_connect")
+            def set_isolation(connection, branch):
+                if not branch:
+                    self._set_connection_isolation(connection, isolation_level)
+
+    def set_connection_execution_options(self, connection, opts):
+        if 'isolation_level' in opts:
+            self._set_connection_isolation(connection, opts['isolation_level'])
+
+    def _set_connection_isolation(self, connection, level):
+        self.set_isolation_level(connection.connection, level)
+        connection.connection._connection_record.\
+            finalize_callback.append(self.reset_isolation_level)
+
+
     def do_begin(self, dbapi_connection):
         pass
 
index c65986ca27cc4592ea449db4db666bc591f7477d..3ca91968b302e96aca54f402952dee12811280a7 100644 (file)
@@ -150,13 +150,8 @@ class DefaultEngineStrategy(EngineStrategy):
                 event.listen(pool, 'connect', on_connect)
 
             def first_connect(dbapi_connection, connection_record):
-                c = base.Connection(engine, connection=dbapi_connection)
-
-                # TODO: removing this allows the on connect activities
-                # to generate events.  tests currently assume these aren't
-                # sent.  do we want users to get all the initial connect
-                # activities as events ?
-                c._has_events = False
+                c = base.Connection(engine, connection=dbapi_connection,
+                            _has_events=False)
 
                 dialect.initialize(c)
             event.listen(pool, 'first_connect', first_connect)
index ae2e4ed93cc8349002c6e1da864dda4563a2596a..7f11232ac40adce7e2bc20c58099c363bb601af2 100644 (file)
@@ -319,6 +319,10 @@ class PoolEvents(event.Events):
         connection will be disposed and a fresh connection retrieved.
         Processing of all checkout listeners will abort and restart
         using the new connection.
+
+        .. seealso:: :meth:`.ConnectionEvents.connect` - a similar event
+           which occurs upon creation of a new :class:`.Connection`.
+
         """
 
     def checkin(self, dbapi_connection, connection_record):
@@ -615,6 +619,103 @@ class ConnectionEvents(event.Events):
 
         """
 
+    def engine_connect(self, conn, branch):
+        """Intercept the creation of a new :class:`.Connection`.
+
+        This event is called typically as the direct result of calling
+        the :meth:`.Engine.connect` method.
+
+        It differs from the :meth:`.PoolEvents.connect` method, which
+        refers to the actual connection to a database at the DBAPI level;
+        a DBAPI connection may be pooled and reused for many operations.
+        In contrast, this event refers only to the production of a higher level
+        :class:`.Connection` wrapper around such a DBAPI connection.
+
+        It also differs from the :meth:`.PoolEvents.checkout` event
+        in that it is specific to the :class:`.Connection` object, not the
+        DBAPI connection that :meth:`.PoolEvents.checkout` deals with, although
+        this DBAPI connection is available here via the :attr:`.Connection.connection`
+        attribute.  But note there can in fact
+        be multiple :meth:`.PoolEvents.checkout` events within the lifespan
+        of a single :class:`.Connection` object, if that :class:`.Connection`
+        is invalidated and re-established.  There can also be multiple
+        :class:`.Connection` objects generated for the same already-checked-out
+        DBAPI connection, in the case that a "branch" of a :class:`.Connection`
+        is produced.
+
+        :param conn: :class:`.Connection` object.
+        :param branch: if True, this is a "branch" of an existing
+         :class:`.Connection`.  A branch is generated within the course
+         of a statement execution to invoke supplemental statements, most
+         typically to pre-execute a SELECT of a default value for the purposes
+         of an INSERT statement.
+
+        .. versionadded:: 0.9.0
+
+        .. seealso::
+
+            :meth:`.PoolEvents.checkout` the lower-level pool checkout event
+            for an individual DBAPI connection
+
+            :meth:`.ConnectionEvents.set_connection_execution_options` - a copy of a
+            :class:`.Connection` is also made when the
+            :meth:`.Connection.execution_options` method is called.
+
+        """
+
+    def set_connection_execution_options(self, conn, opts):
+        """Intercept when the :meth:`.Connection.execution_options`
+        method is called.
+
+        This method is called after the new :class:`.Connection` has been
+        produced, with the newly updated execution options collection, but
+        before the :class:`.Dialect` has acted upon any of those new options.
+
+        Note that this method is not called when a new :class:`.Connection`
+        is produced which is inheriting execution options from its parent
+        :class:`.Engine`; to intercept this condition, use the
+        :meth:`.ConnectionEvents.connect` event.
+
+        :param conn: The newly copied :class:`.Connection` object
+
+        :param opts: dictionary of options that were passed to the
+         :meth:`.Connection.execution_options` method.
+
+        .. versionadded:: 0.9.0
+
+        .. seealso::
+
+            :meth:`.ConnectionEvents.set_engine_execution_options` - event
+            which is called when :meth:`.Engine.execution_options` is called.
+
+
+        """
+
+    def set_engine_execution_options(self, engine, opts):
+        """Intercept when the :meth:`.Engine.execution_options`
+        method is called.
+
+        The :meth:`.Engine.execution_options` method produces a shallow
+        copy of the :class:`.Engine` which stores the new options.  That new
+        :class:`.Engine` is passed here.   A particular application of this
+        method is to add a :meth:`.ConnectionEvents.engine_connect` event
+        handler to the given :class:`.Engine` which will perform some per-
+        :class:`.Connection` task specific to these execution options.
+
+        :param conn: The newly copied :class:`.Engine` object
+
+        :param opts: dictionary of options that were passed to the
+         :meth:`.Connection.execution_options` method.
+
+        .. versionadded:: 0.9.0
+
+        .. seealso::
+
+            :meth:`.ConnectionEvents.set_connection_execution_options` - event
+            which is called when :meth:`.Connection.execution_options` is called.
+
+        """
+
     def begin(self, conn):
         """Intercept begin() events.
 
index dcf3d9e3943411cfd0fb8e471e011ff5b4356a74..97411dd3a3c132a9df4c73682429fb96c26a0b7e 100644 (file)
@@ -25,6 +25,7 @@ from .util import queue as sqla_queue
 from .util import threading, memoized_property, \
     chop_traceback
 
+from collections import deque
 proxies = {}
 
 
@@ -217,7 +218,7 @@ class Pool(log.Identified):
 
         """
 
-        return _ConnectionFairy(self).checkout()
+        return _ConnectionFairy.checkout(self)
 
     def _create_connection(self):
         """Called by subclasses to create a new ConnectionRecord."""
@@ -269,18 +270,16 @@ class Pool(log.Identified):
 
         """
         if not self._use_threadlocal:
-            return _ConnectionFairy(self).checkout()
+            return _ConnectionFairy.checkout(self)
 
         try:
             rec = self._threadconns.current()
-            if rec:
-                return rec.checkout()
         except AttributeError:
             pass
+        else:
+            return rec.checkout_existing()
 
-        agent = _ConnectionFairy(self)
-        self._threadconns.current = weakref.ref(agent)
-        return agent.checkout()
+        return _ConnectionFairy.checkout(self, self._threadconns)
 
     def _return_conn(self, record):
         """Given a _ConnectionRecord, return it to the :class:`.Pool`.
@@ -311,11 +310,11 @@ class Pool(log.Identified):
 
 
 class _ConnectionRecord(object):
-    finalize_callback = None
 
     def __init__(self, pool):
         self.__pool = pool
         self.connection = self.__connect()
+        self.finalize_callback = deque()
 
         pool.dispatch.first_connect.\
                     for_modify(pool.dispatch).\
@@ -326,6 +325,36 @@ class _ConnectionRecord(object):
     def info(self):
         return {}
 
+    @classmethod
+    def checkout(cls, pool):
+        rec = pool._do_get()
+        dbapi_connection = rec.get_connection()
+        fairy = _ConnectionFairy(dbapi_connection, rec)
+        rec.fairy_ref = weakref.ref(
+                        fairy,
+                        lambda ref: _finalize_fairy and \
+                            _finalize_fairy(
+                                    dbapi_connection,
+                                    rec, pool, ref, pool._echo)
+                    )
+        _refs.add(rec)
+        if pool._echo:
+            pool.logger.debug("Connection %r checked out from pool",
+                       dbapi_connection)
+        return fairy
+
+    def checkin(self):
+        self.fairy_ref = None
+        connection = self.connection
+        pool = self.__pool
+        while self.finalize_callback:
+            finalizer = self.finalize_callback.pop()
+            finalizer(connection)
+        if pool.dispatch.checkin:
+            pool.dispatch.checkin(connection, self)
+        pool._return_conn(self)
+
+
     def close(self):
         if self.connection is not None:
             self.__pool._close_connection(self.connection)
@@ -373,11 +402,15 @@ class _ConnectionRecord(object):
             raise
 
 
-def _finalize_fairy(connection, connection_record, pool, ref, echo):
+def _finalize_fairy(connection, connection_record, pool, ref, echo, fairy=None):
+    """Cleanup for a :class:`._ConnectionFairy` whether or not it's already
+    been garbage collected.
+
+    """
     _refs.discard(connection_record)
 
     if ref is not None and \
-                connection_record.fairy is not ref:
+                connection_record.fairy_ref is not ref:
         return
 
     if connection is not None:
@@ -386,35 +419,31 @@ def _finalize_fairy(connection, connection_record, pool, ref, echo):
                                     connection)
 
         try:
+            fairy = fairy or _ConnectionFairy(connection, connection_record)
             if pool.dispatch.reset:
-                pool.dispatch.reset(connection, connection_record)
+                pool.dispatch.reset(fairy, connection_record)
             if pool._reset_on_return is reset_rollback:
                 if echo:
                     pool.logger.debug("Connection %s rollback-on-return",
                                                     connection)
-                pool._dialect.do_rollback(connection)
+                pool._dialect.do_rollback(fairy)
             elif pool._reset_on_return is reset_commit:
                 if echo:
-                    pool.logger.debug("Conneciton %s commit-on-return",
+                    pool.logger.debug("Connection %s commit-on-return",
                                                     connection)
-                pool._dialect.do_commit(connection)
+                pool._dialect.do_commit(fairy)
+
             # Immediately close detached instances
-            if connection_record is None:
+            if not connection_record:
                 pool._close_connection(connection)
         except Exception as e:
-            if connection_record is not None:
+            if connection_record:
                 connection_record.invalidate(e=e)
             if isinstance(e, (SystemExit, KeyboardInterrupt)):
                 raise
 
-    if connection_record is not None:
-        connection_record.fairy = None
-        if connection_record.finalize_callback:
-            connection_record.finalize_callback(connection)
-            del connection_record.finalize_callback
-        if pool.dispatch.checkin:
-            pool.dispatch.checkin(connection, connection_record)
-        pool._return_conn(connection_record)
+    if connection_record:
+        connection_record.checkin()
 
 
 _refs = set()
@@ -424,27 +453,58 @@ class _ConnectionFairy(object):
     """Proxies a DB-API connection and provides return-on-dereference
     support."""
 
-    def __init__(self, pool):
-        self._pool = pool
-        self.__counter = 0
-        self._echo = _echo = pool._should_log_debug()
-        try:
-            rec = self._connection_record = pool._do_get()
-            conn = self.connection = self._connection_record.get_connection()
-            rec.fairy = weakref.ref(
-                            self,
-                            lambda ref: _finalize_fairy and \
-                                _finalize_fairy(conn, rec, pool, ref, _echo)
-                        )
-            _refs.add(rec)
-        except:
-            # helps with endless __getattr__ loops later on
-            self.connection = None
-            self._connection_record = None
-            raise
-        if self._echo:
-            self._pool.logger.debug("Connection %r checked out from pool",
-                       self.connection)
+    def __init__(self, dbapi_connection, connection_record):
+        self.connection = dbapi_connection
+        self._connection_record = connection_record
+
+    @classmethod
+    def checkout(cls, pool, threadconns=None, fairy=None):
+        if not fairy:
+            fairy = _ConnectionRecord.checkout(pool)
+
+            fairy._pool = pool
+            fairy._counter = 0
+            fairy._echo = pool._should_log_debug()
+
+            if threadconns is not None:
+                threadconns.current = weakref.ref(fairy)
+
+        if fairy.connection is None:
+            raise exc.InvalidRequestError("This connection is closed")
+        fairy._counter += 1
+
+        if not pool.dispatch.checkout or fairy._counter != 1:
+            return fairy
+
+        # Pool listeners can trigger a reconnection on checkout
+        attempts = 2
+        while attempts > 0:
+            try:
+                pool.dispatch.checkout(fairy.connection,
+                                            fairy._connection_record,
+                                            fairy)
+                return fairy
+            except exc.DisconnectionError as e:
+                pool.logger.info(
+                    "Disconnection detected on checkout: %s", e)
+                fairy._connection_record.invalidate(e)
+                fairy.connection = fairy._connection_record.get_connection()
+                attempts -= 1
+
+        pool.logger.info("Reconnection attempts exhausted on checkout")
+        fairy.invalidate()
+        raise exc.InvalidRequestError("This connection is closed")
+
+    def checkout_existing(self):
+        return _ConnectionFairy.checkout(self._pool, fairy=self)
+
+    def checkin(self):
+        _finalize_fairy(self.connection, self._connection_record,
+                            self._pool, None, self._echo, fairy=self)
+        self.connection = None
+        self._connection_record = None
+
+    _close = checkin
 
     @property
     def _logger(self):
@@ -465,10 +525,7 @@ class _ConnectionFairy(object):
         in subsequent instances of :class:`.ConnectionFairy`.
 
         """
-        try:
-            return self._connection_record.info
-        except AttributeError:
-            raise exc.InvalidRequestError("This connection is closed")
+        return self._connection_record.info
 
     def invalidate(self, e=None):
         """Mark this connection as invalidated.
@@ -479,10 +536,10 @@ class _ConnectionFairy(object):
 
         if self.connection is None:
             raise exc.InvalidRequestError("This connection is closed")
-        if self._connection_record is not None:
+        if self._connection_record:
             self._connection_record.invalidate(e=e)
         self.connection = None
-        self._close()
+        self.checkin()
 
     def cursor(self, *args, **kwargs):
         return self.connection.cursor(*args, **kwargs)
@@ -490,32 +547,6 @@ class _ConnectionFairy(object):
     def __getattr__(self, key):
         return getattr(self.connection, key)
 
-    def checkout(self):
-        if self.connection is None:
-            raise exc.InvalidRequestError("This connection is closed")
-        self.__counter += 1
-
-        if not self._pool.dispatch.checkout or self.__counter != 1:
-            return self
-
-        # Pool listeners can trigger a reconnection on checkout
-        attempts = 2
-        while attempts > 0:
-            try:
-                self._pool.dispatch.checkout(self.connection,
-                                            self._connection_record,
-                                            self)
-                return self
-            except exc.DisconnectionError as e:
-                self._pool.logger.info(
-                "Disconnection detected on checkout: %s", e)
-                self._connection_record.invalidate(e)
-                self.connection = self._connection_record.get_connection()
-                attempts -= 1
-
-        self._pool.logger.info("Reconnection attempts exhausted on checkout")
-        self.invalidate()
-        raise exc.InvalidRequestError("This connection is closed")
 
     def detach(self):
         """Separate this connection from its Pool.
@@ -532,22 +563,17 @@ class _ConnectionFairy(object):
 
         if self._connection_record is not None:
             _refs.remove(self._connection_record)
-            self._connection_record.fairy = None
+            self._connection_record.fairy_ref = None
             self._connection_record.connection = None
             self._pool._do_return_conn(self._connection_record)
             self.info = self.info.copy()
             self._connection_record = None
 
     def close(self):
-        self.__counter -= 1
-        if self.__counter == 0:
-            self._close()
+        self._counter -= 1
+        if self._counter == 0:
+            self.checkin()
 
-    def _close(self):
-        _finalize_fairy(self.connection, self._connection_record,
-                            self._pool, None, self._echo)
-        self.connection = None
-        self._connection_record = None
 
 
 class SingletonThreadPool(Pool):
index 9795e4c1080fbf2cd4b4bf07c0dd125e6aba113f..1d2aebf972203a9b0d16f6e6f556d20f2437ace2 100644 (file)
@@ -1014,6 +1014,53 @@ class ResultProxyTest(fixtures.TestBase):
             finally:
                 r.close()
 
+class ExecutionOptionsTest(fixtures.TestBase):
+    def test_dialect_conn_options(self):
+        engine = testing_engine("sqlite://")
+        engine.dialect = Mock()
+        conn = engine.connect()
+        c2 = conn.execution_options(foo="bar")
+        eq_(
+            engine.dialect.set_connection_execution_options.mock_calls,
+            [call(c2, {"foo": "bar"})]
+        )
+
+    def test_dialect_engine_options(self):
+        engine = testing_engine("sqlite://")
+        engine.dialect = Mock()
+        e2 = engine.execution_options(foo="bar")
+        eq_(
+            engine.dialect.set_engine_execution_options.mock_calls,
+            [call(e2, {"foo": "bar"})]
+        )
+
+    def test_dialect_engine_construction_options(self):
+        dialect = Mock()
+        engine = Engine(Mock(), dialect, Mock(),
+                                execution_options={"foo": "bar"})
+        eq_(
+            dialect.set_engine_execution_options.mock_calls,
+            [call(engine, {"foo": "bar"})]
+        )
+
+    def test_propagate_engine_to_connection(self):
+        engine = testing_engine("sqlite://",
+                        options=dict(execution_options={"foo": "bar"}))
+        conn = engine.connect()
+        eq_(conn._execution_options, {"foo": "bar"})
+
+    def test_propagate_option_engine_to_connection(self):
+        e1 = testing_engine("sqlite://",
+                        options=dict(execution_options={"foo": "bar"}))
+        e2 = e1.execution_options(bat="hoho")
+        c1 = e1.connect()
+        c2 = e2.connect()
+        eq_(c1._execution_options, {"foo": "bar"})
+        eq_(c2._execution_options, {"foo": "bar", "bat": "hoho"})
+
+
+
+
 
 class AlternateResultProxyTest(fixtures.TestBase):
     __requires__ = ('sqlite', )
@@ -1101,63 +1148,58 @@ class EngineEventsTest(fixtures.TestBase):
         e1 = testing_engine(config.db_url)
         e2 = testing_engine(config.db_url)
 
-        canary = []
-        def before_exec(conn, stmt, *arg):
-            canary.append(stmt)
-        event.listen(e1, "before_execute", before_exec)
+        canary = Mock()
+        event.listen(e1, "before_execute", canary)
         s1 = select([1])
         s2 = select([2])
         e1.execute(s1)
         e2.execute(s2)
-        eq_(canary, [s1])
-        event.listen(e2, "before_execute", before_exec)
+        eq_(
+            [arg[1][1] for arg in canary.mock_calls], [s1]
+        )
+        event.listen(e2, "before_execute", canary)
         e1.execute(s1)
         e2.execute(s2)
-        eq_(canary, [s1, s1, s2])
+        eq_([arg[1][1] for arg in canary.mock_calls], [s1, s1, s2])
 
     def test_per_engine_plus_global(self):
-        canary = []
-        def be1(conn, stmt, *arg):
-            canary.append('be1')
-        def be2(conn, stmt, *arg):
-            canary.append('be2')
-        def be3(conn, stmt, *arg):
-            canary.append('be3')
-
-        event.listen(Engine, "before_execute", be1)
+        canary = Mock()
+        event.listen(Engine, "before_execute", canary.be1)
         e1 = testing_engine(config.db_url)
         e2 = testing_engine(config.db_url)
 
-        event.listen(e1, "before_execute", be2)
+        event.listen(e1, "before_execute", canary.be2)
 
-        event.listen(Engine, "before_execute", be3)
+        event.listen(Engine, "before_execute", canary.be3)
         e1.connect()
         e2.connect()
-        canary[:] = []
+
         e1.execute(select([1]))
+        canary.be1.assert_call_count(1)
+        canary.be2.assert_call_count(1)
+
         e2.execute(select([1]))
 
-        eq_(canary, ['be1', 'be3', 'be2', 'be1', 'be3'])
+        canary.be1.assert_call_count(2)
+        canary.be2.assert_call_count(1)
+        canary.be3.assert_call_count(2)
 
     def test_per_connection_plus_engine(self):
-        canary = []
-        def be1(conn, stmt, *arg):
-            canary.append('be1')
-        def be2(conn, stmt, *arg):
-            canary.append('be2')
+        canary = Mock()
         e1 = testing_engine(config.db_url)
 
-        event.listen(e1, "before_execute", be1)
+        event.listen(e1, "before_execute", canary.be1)
 
         conn = e1.connect()
-        event.listen(conn, "before_execute", be2)
-        canary[:] = []
+        event.listen(conn, "before_execute", canary.be2)
         conn.execute(select([1]))
 
-        eq_(canary, ['be2', 'be1'])
+        canary.be1.assert_call_count(1)
+        canary.be2.assert_call_count(1)
 
         conn._branch().execute(select([1]))
-        eq_(canary, ['be2', 'be1', 'be2', 'be1'])
+        canary.be1.assert_call_count(2)
+        canary.be2.assert_call_count(2)
 
     def test_argument_format_execute(self):
         def before_execute(conn, clauseelement, multiparams, params):
@@ -1320,6 +1362,44 @@ class EngineEventsTest(fixtures.TestBase):
             canary, ['execute', 'cursor_execute']
         )
 
+    def test_engine_connect(self):
+        engine = engines.testing_engine()
+
+        tracker = Mock()
+        event.listen(engine, "engine_connect", tracker)
+
+        c1 = engine.connect()
+        c2 = c1._branch()
+        c1.close()
+        eq_(
+            tracker.mock_calls,
+            [call(c1, False), call(c2, True)]
+        )
+
+    def test_execution_options(self):
+        engine = engines.testing_engine()
+
+        engine_tracker = Mock()
+        conn_tracker = Mock()
+
+        event.listen(engine, "set_engine_execution_options", engine_tracker)
+        event.listen(engine, "set_connection_execution_options", conn_tracker)
+
+        e2 = engine.execution_options(e1='opt_e1')
+        c1 = engine.connect()
+        c2 = c1.execution_options(c1='opt_c1')
+        c3 = e2.connect()
+        c4 = c3.execution_options(c3='opt_c3')
+        eq_(
+            engine_tracker.mock_calls,
+            [call(e2, {'e1': 'opt_e1'})]
+        )
+        eq_(
+            conn_tracker.mock_calls,
+            [call(c2, {"c1": "opt_c1"}), call(c4, {"c3": "opt_c3"})]
+        )
+
+
     @testing.requires.sequences
     @testing.provide_metadata
     def test_cursor_execute(self):
index 5b64a9f319479484f3501563e69ac06fc3c167a6..8c4bcd8b5a52ff783c85bf80ad14b6a0d6579458 100644 (file)
@@ -879,9 +879,10 @@ class QueuePoolTest(PoolTestBase):
                                    pool_size=2, timeout=timeout,
                                    max_overflow=max_overflow)
                 def waiter(p):
+                    success_key = (timeout, max_overflow)
                     conn = p.connect()
                     time.sleep(.5)
-                    success.append(True)
+                    success.append(success_key)
                     conn.close()
 
                 time.sleep(.2)
@@ -896,8 +897,8 @@ class QueuePoolTest(PoolTestBase):
                 c1.invalidate()
                 c2.invalidate()
                 p2 = p._replace()
-        time.sleep(2)
-        eq_(len(success), 12)
+        time.sleep(1)
+        eq_(len(success), 12, "successes: %s" % success)
 
     @testing.requires.threading_with_mock
     @testing.requires.python26
index 789c15444cf57e97712a7447a8285ac277194b6e..ffc12b5b97d7e02e0c876d3d1ffa6069c9a17b4b 100644 (file)
@@ -1279,15 +1279,15 @@ class IsolationLevelTest(fixtures.TestBase):
         )
 
 
-    def test_per_engine_bzzt(self):
-        assert_raises_message(
-            exc.ArgumentError,
-            r"'isolation_level' execution option may "
-            r"only be specified on Connection.execution_options\(\). "
-            r"To set engine-wide isolation level, "
-            r"use the isolation_level argument to create_engine\(\).",
-            create_engine,
-            testing.db.url,
-                execution_options={'isolation_level':
-                            self._non_default_isolation_level}
+    def test_per_engine(self):
+        # new in 0.9
+        eng = create_engine(testing.db.url,
+                            execution_options={
+                                'isolation_level':
+                                    self._non_default_isolation_level()}
+                        )
+        conn = eng.connect()
+        eq_(
+            eng.dialect.get_isolation_level(conn.connection),
+            self._non_default_isolation_level()
         )
index 4d89646390de8cf0a7b665efb799c9c1f69e1248..915cfadb4880fe8f2c024c6d09924ca39784592a 100644 (file)
@@ -168,19 +168,19 @@ test.aaa_profiling.test_orm.MergeTest.test_merge_no_load 3.3_sqlite_pysqlite_noc
 
 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect
 
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.6_sqlite_pysqlite_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_mysql_mysqldb_cextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_mysql_mysqldb_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_oracle_cx_oracle_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_postgresql_psycopg2_cextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_postgresql_psycopg2_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_sqlite_pysqlite_cextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_sqlite_pysqlite_nocextensions 82
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.2_postgresql_psycopg2_nocextensions 70
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.2_sqlite_pysqlite_nocextensions 70
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_oracle_cx_oracle_nocextensions 69
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_postgresql_psycopg2_nocextensions 69
-test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_sqlite_pysqlite_nocextensions 69
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.6_sqlite_pysqlite_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_mysql_mysqldb_cextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_mysql_mysqldb_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_oracle_cx_oracle_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_postgresql_psycopg2_cextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_postgresql_psycopg2_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_sqlite_pysqlite_cextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 2.7_sqlite_pysqlite_nocextensions 87
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.2_postgresql_psycopg2_nocextensions 75
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.2_sqlite_pysqlite_nocextensions 75
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_oracle_cx_oracle_nocextensions 74
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_postgresql_psycopg2_nocextensions 74
+test.aaa_profiling.test_pool.QueuePoolTest.test_first_connect 3.3_sqlite_pysqlite_nocextensions 74
 
 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect
 
@@ -200,19 +200,19 @@ test.aaa_profiling.test_pool.QueuePoolTest.test_second_connect 3.3_sqlite_pysqli
 
 # TEST: test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect
 
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.6_sqlite_pysqlite_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_mysql_mysqldb_cextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_mysql_mysqldb_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_oracle_cx_oracle_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_postgresql_psycopg2_cextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_postgresql_psycopg2_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_sqlite_pysqlite_cextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_sqlite_pysqlite_nocextensions 6
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.2_postgresql_psycopg2_nocextensions 7
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.2_sqlite_pysqlite_nocextensions 7
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_oracle_cx_oracle_nocextensions 7
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_postgresql_psycopg2_nocextensions 7
-test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_sqlite_pysqlite_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.6_sqlite_pysqlite_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_mysql_mysqldb_cextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_mysql_mysqldb_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_oracle_cx_oracle_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_postgresql_psycopg2_cextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_postgresql_psycopg2_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_sqlite_pysqlite_cextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 2.7_sqlite_pysqlite_nocextensions 7
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.2_postgresql_psycopg2_nocextensions 8
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.2_sqlite_pysqlite_nocextensions 8
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_oracle_cx_oracle_nocextensions 8
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_postgresql_psycopg2_nocextensions 8
+test.aaa_profiling.test_pool.QueuePoolTest.test_second_samethread_connect 3.3_sqlite_pysqlite_nocextensions 8
 
 # TEST: test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute
 
@@ -232,19 +232,19 @@ test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_connection_execute
 
 # TEST: test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute
 
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.6_sqlite_pysqlite_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_cextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_oracle_cx_oracle_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_cextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_cextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_nocextensions 68
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.2_postgresql_psycopg2_nocextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.2_sqlite_pysqlite_nocextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_oracle_cx_oracle_nocextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_postgresql_psycopg2_nocextensions 66
-test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_sqlite_pysqlite_nocextensions 66
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.6_sqlite_pysqlite_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_cextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_mysql_mysqldb_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_oracle_cx_oracle_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_cextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_postgresql_psycopg2_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_cextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 2.7_sqlite_pysqlite_nocextensions 73
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.2_postgresql_psycopg2_nocextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.2_sqlite_pysqlite_nocextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_oracle_cx_oracle_nocextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_postgresql_psycopg2_nocextensions 71
+test.aaa_profiling.test_resultset.ExecutionTest.test_minimal_engine_execute 3.3_sqlite_pysqlite_nocextensions 71
 
 # TEST: test.aaa_profiling.test_resultset.ResultSetTest.test_contains_doesnt_compile