.. changelog::
:version: 0.9.4
+ .. change::
+ :tags: feature, engine
+
+ Added some new event mechanics for dialect-level events; the initial
+ implementation allows an event handler to redefine the specific mechanics
+ by which an arbitrary dialect invokes execute() or executemany() on a
+ DBAPI cursor. The new events, at this point semi-public and experimental,
+ are in support of some upcoming transaction-related extensions.
+
.. change::
:tags: feature, engine
:tickets: 2978
.. autoclass:: sqlalchemy.events.ConnectionEvents
:members:
+.. autoclass:: sqlalchemy.events.DialectEvents
+ :members:
+
Schema Events
-----------------------
sql_util._repr_params(parameters, batches=10))
try:
if context.executemany:
- self.dialect.do_executemany(
- cursor,
- statement,
- parameters,
- context)
+ for fn in () if not self.dialect._has_events \
+ else self.dialect.dispatch.do_executemany:
+ if fn(cursor, statement, parameters, context):
+ break
+ else:
+ self.dialect.do_executemany(
+ cursor,
+ statement,
+ parameters,
+ context)
+
elif not parameters and context.no_parameters:
- self.dialect.do_execute_no_params(
- cursor,
- statement,
- context)
+ for fn in () if not self.dialect._has_events \
+ else self.dialect.dispatch.do_execute_no_params:
+ if fn(cursor, statement, context):
+ break
+ else:
+ self.dialect.do_execute_no_params(
+ cursor,
+ statement,
+ context)
+
else:
- self.dialect.do_execute(
- cursor,
- statement,
- parameters,
- context)
+ for fn in () if not self.dialect._has_events \
+ else self.dialect.dispatch.do_execute:
+ if fn(cursor, statement, parameters, context):
+ break
+ else:
+ self.dialect.do_execute(
+ cursor,
+ statement,
+ parameters,
+ context)
except Exception as e:
self._handle_dbapi_exception(
e,
self.engine.logger.info(statement)
self.engine.logger.info("%r", parameters)
try:
- self.dialect.do_execute(
- cursor,
- statement,
- parameters,
- context)
+ for fn in () if not self.dialect._has_events \
+ else self.dialect.dispatch.do_execute:
+ if fn(cursor, statement, parameters, context):
+ break
+ else:
+ self.dialect.do_execute(
+ cursor,
+ statement,
+ parameters,
+ context)
except Exception as e:
self._handle_dbapi_exception(
e,
"""
+ _has_events = False
+
+
def create_connect_args(self, url):
"""Build DB-API compatible connection arguments.
from . import event, exc
from .pool import Pool
-from .engine import Connectable, Engine
+from .engine import Connectable, Engine, Dialect
from .sql.base import SchemaEventTarget
class DDLEvents(event.Events):
:meth:`.TwoPhaseTransaction.prepare` was called.
"""
+
+
+class DialectEvents(event.Events):
+ """event interface for execution-replacement functions.
+
+ These events allow direct instrumentation and replacement
+ of key dialect functions which interact with the DBAPI.
+
+ .. note::
+
+ :class:`.DialectEvents` hooks should be considered **semi-public**
+ and experimental.
+ These hooks are not for general use and are only for those situations where
+ intricate re-statement of DBAPI mechanics must be injected onto an existing
+ dialect. For general-use statement-interception events, please
+ use the :class:`.ConnectionEvents` interface.
+
+ .. seealso::
+
+ :meth:`.ConnectionEvents.before_cursor_execute`
+
+ :meth:`.ConnectionEvents.before_execute`
+
+ :meth:`.ConnectionEvents.after_cursor_execute`
+
+ :meth:`.ConnectionEvents.after_execute`
+
+
+ .. versionadded:: 0.9.4
+
+ """
+
+ _target_class_doc = "SomeEngine"
+ _dispatch_target = Dialect
+
+ @classmethod
+ def _listen(cls, event_key, retval=False):
+ target, identifier, fn = \
+ event_key.dispatch_target, event_key.identifier, event_key.fn
+
+ target._has_events = True
+ event_key.base_listen()
+
+ @classmethod
+ def _accept_with(cls, target):
+ if isinstance(target, type):
+ if issubclass(target, Engine):
+ return Dialect
+ elif issubclass(target, Dialect):
+ return target
+ elif isinstance(target, Engine):
+ return target.dialect
+ else:
+ return target
+
+ def do_executemany(self, cursor, statement, parameters, context):
+ """Receive a cursor to have executemany() called.
+
+ Return the value True to halt further events from invoking,
+ and to indicate that the cursor execution has already taken
+ place within the event handler.
+
+ """
+
+ def do_execute_no_params(self, cursor, statement, context):
+ """Receive a cursor to have execute() with no parameters called.
+
+ Return the value True to halt further events from invoking,
+ and to indicate that the cursor execution has already taken
+ place within the event handler.
+
+ """
+
+ def do_execute(self, cursor, statement, parameters, context):
+ """Receive a cursor to have execute() called.
+
+ Return the value True to halt further events from invoking,
+ and to indicate that the cursor execution has already taken
+ place within the event handler.
+
+ """
+
from sqlalchemy.engine.base import Engine
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.mock import Mock, call, patch
-
+from contextlib import contextmanager
users, metadata, users_autoinc = None, None, None
class ExecuteTest(fixtures.TestBase):
'prepare_twophase', 'commit_twophase']
)
+class DialectEventTest(fixtures.TestBase):
+ @contextmanager
+ def _run_test(self, retval):
+ m1 = Mock()
+
+ m1.do_execute.return_value = retval
+ m1.do_executemany.return_value = retval
+ m1.do_execute_no_params.return_value = retval
+ e = engines.testing_engine(options={"_initialize": False})
+
+ event.listen(e, "do_execute", m1.do_execute)
+ event.listen(e, "do_executemany", m1.do_executemany)
+ event.listen(e, "do_execute_no_params", m1.do_execute_no_params)
+
+ e.dialect.do_execute = m1.real_do_execute
+ e.dialect.do_executemany = m1.real_do_executemany
+ e.dialect.do_execute_no_params = m1.real_do_execute_no_params
+
+ with e.connect() as conn:
+ yield conn, m1
+
+ def _assert(self, retval, m1, m2, mock_calls):
+ eq_(m1.mock_calls, mock_calls)
+ if retval:
+ eq_(m2.mock_calls, [])
+ else:
+ eq_(m2.mock_calls, mock_calls)
+
+ def _test_do_execute(self, retval):
+ with self._run_test(retval) as (conn, m1):
+ result = conn.execute("insert into table foo", {"foo": "bar"})
+ self._assert(
+ retval,
+ m1.do_execute, m1.real_do_execute,
+ [call(
+ result.context.cursor,
+ "insert into table foo",
+ {"foo": "bar"}, result.context)]
+ )
+
+ def _test_do_executemany(self, retval):
+ with self._run_test(retval) as (conn, m1):
+ result = conn.execute("insert into table foo",
+ [{"foo": "bar"}, {"foo": "bar"}])
+ self._assert(
+ retval,
+ m1.do_executemany, m1.real_do_executemany,
+ [call(
+ result.context.cursor,
+ "insert into table foo",
+ [{"foo": "bar"}, {"foo": "bar"}], result.context)]
+ )
+
+ def _test_do_execute_no_params(self, retval):
+ with self._run_test(retval) as (conn, m1):
+ result = conn.execution_options(no_parameters=True).\
+ execute("insert into table foo")
+ self._assert(
+ retval,
+ m1.do_execute_no_params, m1.real_do_execute_no_params,
+ [call(
+ result.context.cursor,
+ "insert into table foo", result.context)]
+ )
+
+ def _test_cursor_execute(self, retval):
+ with self._run_test(retval) as (conn, m1):
+ dialect = conn.dialect
+
+ stmt = "insert into table foo"
+ params = {"foo": "bar"}
+ ctx = dialect.execution_ctx_cls._init_statement(
+ dialect, conn, conn.connection, stmt, [params])
+
+ conn._cursor_execute(ctx.cursor, stmt, params, ctx)
+
+ self._assert(
+ retval,
+ m1.do_execute, m1.real_do_execute,
+ [call(
+ ctx.cursor,
+ "insert into table foo",
+ {"foo": "bar"}, ctx)]
+ )
+
+ def test_do_execute_w_replace(self):
+ self._test_do_execute(True)
+
+ def test_do_execute_wo_replace(self):
+ self._test_do_execute(False)
+
+ def test_do_executemany_w_replace(self):
+ self._test_do_executemany(True)
+
+ def test_do_executemany_wo_replace(self):
+ self._test_do_executemany(False)
+
+ def test_do_execute_no_params_w_replace(self):
+ self._test_do_execute_no_params(True)
+
+ def test_do_execute_no_params_wo_replace(self):
+ self._test_do_execute_no_params(False)
+
+ def test_cursor_execute_w_replace(self):
+ self._test_cursor_execute(True)
+
+ def test_cursor_execute_wo_replace(self):
+ self._test_cursor_execute(False)
+