self.__savepoint_seq = 0
self.__branch = _branch
self.__invalid = False
+ self._has_events = engine._has_events
self._echo = self.engine._should_log_info()
if _execution_options:
self._execution_options =\
def _begin_impl(self):
if self._echo:
self.engine.logger.info("BEGIN (implicit)")
+
+ if self._has_events:
+ self.engine.dispatch.begin(self)
+
try:
self.engine.dialect.do_begin(self.connection)
except Exception, e:
raise
def _rollback_impl(self):
+ if self._has_events:
+ self.engine.dispatch.rollback(self)
+
if not self.closed and not self.invalidated and \
self._connection_is_valid:
if self._echo:
self.__transaction = None
def _commit_impl(self):
+ if self._has_events:
+ self.engine.dispatch.commit(self)
+
if self._echo:
self.engine.logger.info("COMMIT")
try:
raise
def _savepoint_impl(self, name=None):
+ if self._has_events:
+ self.engine.dispatch.savepoint(self, name)
+
if name is None:
self.__savepoint_seq += 1
name = 'sa_savepoint_%s' % self.__savepoint_seq
return name
def _rollback_to_savepoint_impl(self, name, context):
+ if self._has_events:
+ self.engine.dispatch.rollback_savepoint(self, name, context)
+
if self._connection_is_valid:
self.engine.dialect.do_rollback_to_savepoint(self, name)
self.__transaction = context
def _release_savepoint_impl(self, name, context):
+ if self._has_events:
+ self.engine.dispatch.release_savepoint(self, name, context)
+
if self._connection_is_valid:
self.engine.dialect.do_release_savepoint(self, name)
self.__transaction = context
def _begin_twophase_impl(self, xid):
+ if self._has_events:
+ self.engine.dispatch.begin_twophase(self, xid)
+
if self._connection_is_valid:
self.engine.dialect.do_begin_twophase(self, xid)
def _prepare_twophase_impl(self, xid):
+ if self._has_events:
+ self.engine.dispatch.prepare_twophase(self, xid)
+
if self._connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
self.engine.dialect.do_prepare_twophase(self, xid)
def _rollback_twophase_impl(self, xid, is_prepared):
+ if self._has_events:
+ self.engine.dispatch.rollback_twophase(self, xid, is_prepared)
+
if self._connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
self.engine.dialect.do_rollback_twophase(self, xid, is_prepared)
self.__transaction = None
def _commit_twophase_impl(self, xid, is_prepared):
+ if self._has_events:
+ self.engine.dispatch.commit_twophase(self, xid, is_prepared)
+
if self._connection_is_valid:
assert isinstance(self.__transaction, TwoPhaseTransaction)
self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
* a :class:`.Compiled` object
"""
-
for c in type(object).__mro__:
if c in Connection.executors:
return Connection.executors[c](
def _execute_default(self, default, multiparams, params):
"""Execute a schema.ColumnDefault object."""
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ default, multiparams, params = \
+ fn(self, default, multiparams, params)
+
try:
try:
conn = self.__connection
ret = ctx._exec_default(default, None)
if self.should_close_with_result:
self.close()
+
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ default, multiparams, params, ret)
+
return ret
- def _execute_ddl(self, ddl, params, multiparams):
+ def _execute_ddl(self, ddl, multiparams, params):
"""Execute a schema.DDL object."""
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ ddl, multiparams, params = \
+ fn(self, ddl, multiparams, params)
+
dialect = self.dialect
compiled = ddl.compile(dialect=dialect)
- return self._execute_context(
+ ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_ddl,
compiled,
None,
compiled
)
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ ddl, multiparams, params, ret)
+ return ret
def _execute_clauseelement(self, elem, multiparams, params):
"""Execute a sql.ClauseElement object."""
- params = self.__distill_params(multiparams, params)
- if params:
- keys = params[0].keys()
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ elem, multiparams, params = \
+ fn(self, elem, multiparams, params)
+
+ distilled_params = self.__distill_params(multiparams, params)
+ if distilled_params:
+ keys = distilled_params[0].keys()
else:
keys = []
dialect = self.dialect
if 'compiled_cache' in self._execution_options:
- key = dialect, elem, tuple(keys), len(params) > 1
+ key = dialect, elem, tuple(keys), len(distilled_params) > 1
if key in self._execution_options['compiled_cache']:
compiled_sql = self._execution_options['compiled_cache'][key]
else:
compiled_sql = elem.compile(
dialect=dialect, column_keys=keys,
- inline=len(params) > 1)
+ inline=len(distilled_params) > 1)
self._execution_options['compiled_cache'][key] = compiled_sql
else:
compiled_sql = elem.compile(
dialect=dialect, column_keys=keys,
- inline=len(params) > 1)
+ inline=len(distilled_params) > 1)
- return self._execute_context(
+ ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_compiled,
compiled_sql,
- params,
- compiled_sql, params
+ distilled_params,
+ compiled_sql, distilled_params
)
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ elem, multiparams, params, ret)
+ return ret
def _execute_compiled(self, compiled, multiparams, params):
"""Execute a sql.Compiled object."""
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ compiled, multiparams, params = \
+ fn(self, compiled, multiparams, params)
+
dialect = self.dialect
parameters=self.__distill_params(multiparams, params)
- return self._execute_context(
+ ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_compiled,
compiled,
parameters,
compiled, parameters
)
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ compiled, multiparams, params, ret)
+ return ret
def _execute_text(self, statement, multiparams, params):
"""Execute a string SQL statement."""
+ if self._has_events:
+ for fn in self.engine.dispatch.before_execute:
+ statement, multiparams, params = \
+ fn(self, statement, multiparams, params)
+
dialect = self.dialect
parameters = self.__distill_params(multiparams, params)
- return self._execute_context(
+ ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_statement,
statement,
parameters,
statement, parameters
)
-
- _before_cursor_execute = None
- _after_cursor_execute = None
+ if self._has_events:
+ self.engine.dispatch.after_execute(self,
+ statement, multiparams, params, ret)
+ return ret
def _execute_context(self, dialect, constructor,
statement, parameters,
if not context.executemany:
parameters = parameters[0]
- if self._before_cursor_execute:
- statement, parameters = self._before_cursor_execute(
- context,
- cursor,
- statement,
- parameters)
+ if self._has_events:
+ for fn in self.engine.dispatch.before_cursor_execute:
+ statement, parameters = \
+ fn(self, cursor, statement, parameters,
+ context, context.executemany)
if self._echo:
self.engine.logger.info(statement)
raise
- if self._after_cursor_execute:
- self._after_cursor_execute(context, cursor,
- statement, parameters)
+ if self._has_events:
+ self.engine.dispatch.after_cursor_execute(self, cursor,
+ statement,
+ parameters,
+ context,
+ context.executemany)
if context.compiled:
context.post_exec()
"""
_execution_options = util.immutabledict()
+ _has_events = False
Connection = Connection
def __init__(self, pool, dialect, url,
)
self.update_execution_options(**execution_options)
-
- dispatch = event.dispatcher(events.EngineEvents)
+ dispatch = event.dispatcher(events.ConnectionEvents)
def update_execution_options(self, **opt):
"""update the execution_options dictionary of this :class:`Engine`.
return self.pool.unique_connection()
-def _listener_connection_cls(cls, dispatch):
- """Produce a wrapper for :class:`.Connection` which will apply event
- dispatch to each method.
-
- :class:`.Connection` does not provide event dispatch built in so that
- method call overhead is avoided in the absense of any listeners.
-
- """
- class EventListenerConnection(cls):
- def execute(self, clauseelement, *multiparams, **params):
- for fn in dispatch.before_execute:
- clauseelement, multiparams, params = \
- fn(self, clauseelement, multiparams, params)
-
- ret = super(EventListenerConnection, self).\
- execute(clauseelement, *multiparams, **params)
-
- for fn in dispatch.after_execute:
- fn(self, clauseelement, multiparams, params, ret)
-
- return ret
-
- def _execute_clauseelement(self, clauseelement,
- multiparams=None, params=None):
- return self.execute(clauseelement,
- *(multiparams or []),
- **(params or {}))
-
- def _before_cursor_execute(self, context, cursor,
- statement, parameters):
- for fn in dispatch.before_cursor_execute:
- statement, parameters = \
- fn(self, cursor, statement, parameters,
- context, context.executemany)
- return statement, parameters
-
- def _after_cursor_execute(self, context, cursor,
- statement, parameters):
- dispatch.after_cursor_execute(self, cursor,
- statement,
- parameters,
- context,
- context.executemany)
-
- def _begin_impl(self):
- dispatch.begin(self)
- return super(EventListenerConnection, self).\
- _begin_impl()
-
- def _rollback_impl(self):
- dispatch.rollback(self)
- return super(EventListenerConnection, self).\
- _rollback_impl()
-
- def _commit_impl(self):
- dispatch.commit(self)
- return super(EventListenerConnection, self).\
- _commit_impl()
-
- def _savepoint_impl(self, name=None):
- dispatch.savepoint(self, name)
- return super(EventListenerConnection, self).\
- _savepoint_impl(name=name)
-
- def _rollback_to_savepoint_impl(self, name, context):
- dispatch.rollback_savepoint(self, name, context)
- return super(EventListenerConnection, self).\
- _rollback_to_savepoint_impl(name, context)
-
- def _release_savepoint_impl(self, name, context):
- dispatch.release_savepoint(self, name, context)
- return super(EventListenerConnection, self).\
- _release_savepoint_impl(name, context)
-
- def _begin_twophase_impl(self, xid):
- dispatch.begin_twophase(self, xid)
- return super(EventListenerConnection, self).\
- _begin_twophase_impl(xid)
-
- def _prepare_twophase_impl(self, xid):
- dispatch.prepare_twophase(self, xid)
- return super(EventListenerConnection, self).\
- _prepare_twophase_impl(xid)
-
- def _rollback_twophase_impl(self, xid, is_prepared):
- dispatch.rollback_twophase(self, xid)
- return super(EventListenerConnection, self).\
- _rollback_twophase_impl(xid, is_prepared)
-
- def _commit_twophase_impl(self, xid, is_prepared):
- dispatch.commit_twophase(self, xid, is_prepared)
- return super(EventListenerConnection, self).\
- _commit_twophase_impl(xid, is_prepared)
-
- return EventListenerConnection
# This reconstructor is necessary so that pickles with the C extension or
# without use the same Binary format.
-from test.lib.testing import eq_, assert_raises, assert_raises_message
+from test.lib.testing import eq_, assert_raises, assert_raises_message, config
import re
from sqlalchemy.interfaces import ConnectionProxy
from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \
- bindparam, select, event, TypeDecorator
+ bindparam, select, event, TypeDecorator, create_engine
from sqlalchemy.sql import column, literal
from test.lib.schema import Table, Column
import sqlalchemy as tsa
import logging
from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
from sqlalchemy.engine import base, default
+from sqlalchemy.engine.base import Connection, Engine
users, metadata = None, None
class ExecuteTest(TestBase):
self._test_proxy(base.BufferedColumnResultProxy)
class EngineEventsTest(TestBase):
+ def tearDown(self):
+ Engine.dispatch._clear()
def _assert_stmts(self, expected, received):
for stmt, params, posn in expected:
== params or testparams == posn):
break
+ def test_per_engine_independence(self):
+ e1 = create_engine(config.db_url)
+ e2 = create_engine(config.db_url)
+
+ canary = []
+ def before_exec(conn, stmt, *arg):
+ canary.append(stmt)
+ event.listen(e1, "before_execute", before_exec)
+ s1 = select([1])
+ s2 = select([2])
+ e1.execute(s1)
+ e2.execute(s2)
+ eq_(canary, [s1])
+ event.listen(e2, "before_execute", before_exec)
+ e1.execute(s1)
+ e2.execute(s2)
+ eq_(canary, [s1, s1, s2])
+
+ def test_per_engine_plus_global(self):
+ canary = []
+ def be1(conn, stmt, *arg):
+ canary.append('be1')
+ def be2(conn, stmt, *arg):
+ canary.append('be2')
+ def be3(conn, stmt, *arg):
+ canary.append('be3')
+
+ event.listen(Engine, "before_execute", be1)
+ e1 = create_engine(config.db_url)
+ e2 = create_engine(config.db_url)
+
+ event.listen(e1, "before_execute", be2)
+
+ event.listen(Engine, "before_execute", be3)
+ e1.connect()
+ e2.connect()
+ canary[:] = []
+ e1.execute(select([1]))
+ e2.execute(select([1]))
+
+ eq_(canary, ['be1', 'be3', 'be2', 'be1', 'be3'])
+
+ def test_argument_format_execute(self):
+ def before_execute(conn, clauseelement, multiparams, params):
+ assert isinstance(multiparams, (list, tuple))
+ assert isinstance(params, dict)
+ def after_execute(conn, clauseelement, multiparams, params, result):
+ assert isinstance(multiparams, (list, tuple))
+ assert isinstance(params, dict)
+ e1 = create_engine(config.db_url)
+ event.listen(e1, 'before_execute', before_execute)
+ event.listen(e1, 'after_execute', after_execute)
+
+ e1.execute(select([1]))
+ e1.execute(select([1]).compile(dialect=e1.dialect).statement)
+ e1.execute(select([1]).compile(dialect=e1.dialect))
+ e1._execute_compiled(select([1]).compile(dialect=e1.dialect), [], {})
+
@testing.fails_on('firebird', 'Data type unknown')
def test_execute_events(self):
canary, ['execute', 'cursor_execute']
)
-
-
def test_transactional(self):
canary = []
def tracker(name):