types
event
events
- interfaces
exceptions
compiler
serializer
+ interfaces
\ No newline at end of file
-.. _interfaces_core_toplevel:
+.. _dep_interfaces_core_toplevel:
Deprecated Event Interfaces
============================
.. module:: sqlalchemy.interfaces
-This section describes the class-based event interface introduced in
-SQLAlchemy 0.5. As of SQLAlchemy 0.7, the new event system described in
-:ref:`event_toplevel` should be used.
+This section describes the class-based core event interface introduced in
+SQLAlchemy 0.5. The ORM analogue is described at :ref:`dep_interfaces_orm_toplevel`.
+
+As of SQLAlchemy 0.7, the new event system described in
+:ref:`event_toplevel` replaces the extension/proxy/listener system, providing
+a consistent interface to all events without the need for subclassing.
Execution, Connection and Cursor Events
---------------------------------------
based on any condition, either accompanying the standard generation of tables
or by itself.
+.. _schema_ddl_sequences:
+
Controlling DDL Sequences
-------------------------
So far, the effect is the same. However, if we create DDL elements
corresponding to the creation and removal of this constraint, and associate
-them with the :class:`~sqlalchemy.schema.Table` as events, these new events
+them with the :class:`.Table` as events, these new events
will take over the job of issuing DDL for the constraint. Additionally, the
constraint will be added via ALTER:
.. sourcecode:: python+sql
-
- AddConstraint(constraint).execute_at("after-create", users)
- DropConstraint(constraint).execute_at("before-drop", users)
+
+ from sqlalchemy import event
+
+ event.listen(
+ AddConstraint(constraint),
+ "on_after_create",
+ users
+ )
+ event.listen(
+ DropConstraint(constraint),
+ "on_before_drop",
+ users
+ )
{sql}users.create(engine)
CREATE TABLE users (
ALTER TABLE users DROP CONSTRAINT cst_user_name_length
DROP TABLE users{stop}
-The real usefulness of the above becomes clearer once we illustrate the ``on``
-attribute of a DDL event. The ``on`` parameter is part of the constructor, and
-may be a string name of a database dialect name, a tuple containing dialect
-names, or a Python callable. This will limit the execution of the item to just
-those dialects, or when the return value of the callable is ``True``. So if
-our :class:`~sqlalchemy.schema.CheckConstraint` was only supported by
-Postgresql and not other databases, we could limit it to just that dialect::
-
- AddConstraint(constraint, on='postgresql').execute_at("after-create", users)
- DropConstraint(constraint, on='postgresql').execute_at("before-drop", users)
+The real usefulness of the above becomes clearer once we illustrate the :meth:`.DDLEvent.execute_if`
+method. This method returns a modified form of the DDL callable which will
+filter on criteria before responding to a received event. It accepts a
+parameter ``dialect``, which is the string name of a dialect or a tuple of such,
+which will limit the execution of the item to just those dialects. It also
+accepts a ``callable_`` parameter which may reference a Python callable which will
+be invoked upon event reception, returning ``True`` or ``False`` indicating if
+the event should proceed.
+
+If our :class:`~sqlalchemy.schema.CheckConstraint` was only supported by
+Postgresql and not other databases, we could limit its usage to just that dialect::
+
+ event.listen(
+ AddConstraint(constraint).execute_if(dialect='postgresql'),
+ 'on_after_create',
+ users
+ )
+ event.listen(
+ DropConstraint(constraint).execute_if(dialect='postgresql'),
+ 'on_before_drop',
+ users
+ )
Or to any set of dialects::
+
+ event.listen(
+ AddConstraint(constraint).execute_if(dialect=('postgresql', 'mysql')),
+ "on_after_create",
+ users
+ )
+ event.listen(
+ DropConstraint(constraint).execute_if(dialect=('postgresql', 'mysql')),
+ "on_before_drop",
+ users
+ )
- AddConstraint(constraint, on=('postgresql', 'mysql')).execute_at("after-create", users)
- DropConstraint(constraint, on=('postgresql', 'mysql')).execute_at("before-drop", users)
-
-When using a callable, the callable is passed the ddl element, event name, the
-:class:`~sqlalchemy.schema.Table` or :class:`~sqlalchemy.schema.MetaData`
+When using a callable, the callable is passed the ddl element, the
+:class:`.Table` or :class:`.MetaData`
object whose "create" or "drop" event is in progress, and the
-:class:`~sqlalchemy.engine.base.Connection` object being used for the
+:class:`.Connection` object being used for the
operation, as well as additional information as keyword arguments. The
callable can perform checks, such as whether or not a given item already
exists. Below we define ``should_create()`` and ``should_drop()`` callables
.. sourcecode:: python+sql
- def should_create(ddl, event, target, connection, **kw):
+ def should_create(ddl, target, connection, **kw):
row = connection.execute("select conname from pg_constraint where conname='%s'" % ddl.element.name).scalar()
return not bool(row)
- def should_drop(ddl, event, target, connection, **kw):
- return not should_create(ddl, event, target, connection, **kw)
+ def should_drop(ddl, target, connection, **kw):
+ return not should_create(ddl, target, connection, **kw)
- AddConstraint(constraint, on=should_create).execute_at("after-create", users)
- DropConstraint(constraint, on=should_drop).execute_at("before-drop", users)
+ event.listen(
+ AddConstraint(constraint).execute_if(callable_=should_create),
+ "on_after_create",
+ users
+ )
+ event.listen(
+ DropConstraint(constraint).execute_if(callable_=should_drop),
+ "on_before_drop",
+ users
+ )
{sql}users.create(engine)
CREATE TABLE users (
other DDL elements except it accepts a string which is the text to be emitted:
.. sourcecode:: python+sql
-
- DDL("ALTER TABLE users ADD CONSTRAINT "
- "cst_user_name_length "
- " CHECK (length(user_name) >= 8)").execute_at("after-create", metadata)
+
+ event.listen(
+ DDL("ALTER TABLE users ADD CONSTRAINT "
+ "cst_user_name_length "
+ " CHECK (length(user_name) >= 8)"),
+ "on_after_create",
+ metadata
+ )
A more comprehensive method of creating libraries of DDL constructs is to use
custom compilation - see :ref:`sqlalchemy.ext.compiler_toplevel` for
session
query
loading
- interfaces
+ events
exceptions
extensions/index
examples
+ interfaces
\ No newline at end of file
-.. _interfaces_orm_toplevel:
+.. _dep_interfaces_orm_toplevel:
Deprecated ORM Event Interfaces
================================
.. module:: sqlalchemy.orm.interfaces
-This section describes the various categories of events which can be intercepted
-within the SQLAlchemy ORM.
+This section describes the class-based ORM event interface which first
+existed in SQLAlchemy 0.1, which progressed with more kinds of events up
+until SQLAlchemy 0.5. The non-ORM analogue is described at :ref:`dep_interfaces_core_toplevel`.
-For non-ORM event documentation, see :ref:`interfaces_core_toplevel`.
-
-A new version of this API with a significantly more flexible and consistent
-interface will be available in version 0.7.
+As of SQLAlchemy 0.7, the new event system described in
+:ref:`event_toplevel` replaces the extension/proxy/listener system, providing
+a consistent interface to all events without the need for subclassing.
Mapper Events
-----------------
"""
class EventListenerConnection(cls):
def execute(self, clauseelement, *multiparams, **params):
- if dispatch.on_before_execute:
- for fn in dispatch.on_before_execute:
- result = fn(self, clauseelement, multiparams, params)
- if result:
- clauseelement, multiparams, params = result
+ for fn in dispatch.on_before_execute:
+ clauseelement, multiparams, params = \
+ fn(self, clauseelement, multiparams, params)
- ret = super(EventListenerConnection, self).execute(clauseelement, *multiparams, **params)
+ ret = super(EventListenerConnection, self).\
+ execute(clauseelement, *multiparams, **params)
- if dispatch.on_after_execute:
- for fn in dispatch.on_after_execute:
- fn(self, clauseelement, multiparams, params, ret)
+ for fn in dispatch.on_after_execute:
+ fn(self, clauseelement, multiparams, params, ret)
return ret
- def _execute_clauseelement(self, clauseelement, multiparams=None, params=None):
- return self.execute(clauseelement, *(multiparams or []), **(params or {}))
+ def _execute_clauseelement(self, clauseelement,
+ multiparams=None, params=None):
+ return self.execute(clauseelement,
+ *(multiparams or []),
+ **(params or {}))
def _cursor_execute(self, cursor, statement,
parameters, context=None):
- if dispatch.on_before_cursor_execute:
- for fn in dispatch.on_before_cursor_execute:
- result = fn(self, cursor, statement, parameters, context, False)
- if result:
- statement, parameters = result
+ for fn in dispatch.on_before_cursor_execute:
+ statement, parameters = \
+ fn(self, cursor, statement, parameters,
+ context, False)
ret = super(EventListenerConnection, self).\
- _cursor_execute(cursor, statement, parameters, context)
+ _cursor_execute(cursor, statement, parameters,
+ context)
- if dispatch.on_after_cursor_execute:
- for fn in dispatch.on_after_cursor_execute:
- fn(self, cursor, statement, parameters, context, False)
+ for fn in dispatch.on_after_cursor_execute:
+ fn(self, cursor, statement, parameters, context, False)
return ret
def _cursor_executemany(self, cursor, statement,
parameters, context=None):
for fn in dispatch.on_before_cursor_execute:
- result = fn(self, cursor, statement, parameters, context, True)
- if result:
- statement, parameters = result
+ statement, parameters = \
+ fn(self, cursor, statement, parameters,
+ context, True)
ret = super(EventListenerConnection, self).\
- _cursor_executemany(cursor, statement, parameters, context)
+ _cursor_executemany(cursor, statement,
+ parameters, context)
for fn in dispatch.on_after_cursor_execute:
fn(self, cursor, statement, parameters, context, True)
"""Core event interfaces."""
-from sqlalchemy import event
+from sqlalchemy import event, exc
class DDLEvents(event.Events):
"""
See also:
- :mod:`sqlalchemy.event`
+ :ref:`event_toplevel`
+
+ :ref:`schema_ddl_sequences`
"""
e.g.::
- from sqlalchemy import events
+ from sqlalchemy import event
def my_on_checkout(dbapi_conn, connection_rec, connection_proxy):
"handle an on checkout event"
events.listen(my_on_checkout, 'on_checkout', Pool)
- In addition to the :class:`.Pool` class and :class:`.Pool` instances,
+ In addition to accepting the :class:`.Pool` class and :class:`.Pool` instances,
:class:`.PoolEvents` also accepts :class:`.Engine` objects and
the :class:`.Engine` class as targets, which will be resolved
to the ``.pool`` attribute of the given engine or the :class:`.Pool`
- class.
+ class::
+
+ engine = create_engine("postgresql://scott:tiger@localhost/test")
+
+ # will associate with engine.pool
+ events.listen(my_on_checkout, 'on_checkout', engine)
"""
"""
class EngineEvents(event.Events):
- """Available events for :class:`.Engine`."""
+ """Available events for :class:`.Engine`.
+
+ The methods here define the name of an event as well as the names of members that are passed to listener functions.
+
+ e.g.::
+
+ from sqlalchemy import event, create_engine
+
+ def on_before_execute(conn, clauseelement, multiparams, params):
+ log.info("Received statement: %s" % clauseelement)
+
+ engine = create_engine('postgresql://scott:tiger@localhost/test')
+ event.listen(on_before_execute, "on_before_execute", engine)
+
+ Some events allow modifiers to the listen() function.
+
+ :param retval=False: Applies to the :meth:`.on_before_execute` and
+ :meth:`.on_before_cursor_execute` events only. When True, the
+ user-defined event function must have a return value, which
+ is a tuple of parameters that replace the given statement
+ and parameters. See those methods for a description of
+ specific return arguments.
+
+ """
@classmethod
- def listen(cls, fn, identifier, target):
+ def listen(cls, fn, identifier, target, retval=False):
from sqlalchemy.engine.base import Connection, \
_listener_connection_cls
if target.Connection is Connection:
target.Connection = _listener_connection_cls(
Connection,
target.dispatch)
+
+ if not retval:
+ if identifier == 'on_before_execute':
+ orig_fn = fn
+ def wrap(conn, clauseelement, multiparams, params):
+ orig_fn(conn, clauseelement, multiparams, params)
+ return clauseelement, multiparams, params
+ fn = wrap
+ elif identifier == 'on_before_cursor_execute':
+ orig_fn = fn
+ def wrap(conn, cursor, statement,
+ parameters, context, executemany):
+ orig_fn(conn, cursor, statement,
+ parameters, context, executemany)
+ return statement, parameters
+ fn = wrap
+
+ elif retval and identifier not in ('on_before_execute', 'on_before_cursor_execute'):
+ raise exc.ArgumentError(
+ "Only the 'on_before_execute' and "
+ "'on_before_cursor_execute' engine "
+ "event listeners accept the 'retval=True' "
+ "argument.")
event.Events.listen(fn, identifier, target)
def on_before_execute(self, conn, clauseelement, multiparams, params):
"""Hooks into the lifecycle of connections in a :class:`Pool`.
.. note:: :class:`PoolListener` is deprecated. Please
- refer to :func:`event.listen` as well as
- :class:`.PoolEvents`.
+ refer to :class:`.PoolEvents`.
Usage::
"""Allows interception of statement execution by Connections.
.. note:: :class:`ConnectionProxy` is deprecated. Please
- refer to :func:`event.listen` as well as
- :attr:`.Engine.events`.
+ refer to :class:`.EngineEvents`.
Either or both of the ``execute()`` and ``cursor_execute()``
may be implemented to intercept compiled statement and
:param raw=False: When True, the "target" argument to the
event, if applicable will be the :class:`.InstanceState` management
object, rather than the mapped instance itself.
- :param retval=False: when True, the user-defined event listening
+ :param retval=False: when True, the user-defined event function
must have a return value, the purpose of which is either to
control subsequent event propagation, or to otherwise alter
the operation in progress by the mapper. Possible values
# the MIT License: http://www.opensource.org/licenses/mit-license.php
import sqlalchemy.exceptions as sa_exc
-from sqlalchemy.util import ScopedRegistry, ThreadLocalRegistry, \
- to_list, get_cls_kwargs, deprecated,\
- warn
-from sqlalchemy.orm import (
- EXT_CONTINUE, MapperExtension, class_mapper, object_session
- )
+from sqlalchemy.util import ScopedRegistry, ThreadLocalRegistry, warn
+from sqlalchemy.orm import class_mapper
from sqlalchemy.orm import exc as orm_exc
from sqlalchemy.orm.session import Session
self.registry = ScopedRegistry(session_factory, scopefunc)
else:
self.registry = ThreadLocalRegistry(session_factory)
- self.extension = _ScopedExt(self)
def __call__(self, **kwargs):
if kwargs:
self.registry().close()
self.registry.clear()
- @deprecated("0.5", ":meth:`.ScopedSession.mapper` is deprecated. "
- "Please see http://www.sqlalchemy.org/trac/wiki/UsageRecipes/SessionAwareMapper "
- "for information on how to replicate its behavior.")
- def mapper(self, *args, **kwargs):
- """return a :func:`.mapper` function which associates this ScopedSession with the Mapper.
-
- """
-
- from sqlalchemy.orm import mapper
-
- extension_args = dict((arg, kwargs.pop(arg))
- for arg in get_cls_kwargs(_ScopedExt)
- if arg in kwargs)
-
- kwargs['extension'] = extension = to_list(kwargs.get('extension', []))
- if extension_args:
- extension.append(self.extension.configure(**extension_args))
- else:
- extension.append(self.extension)
- return mapper(*args, **kwargs)
-
def configure(self, **kwargs):
"""reconfigure the sessionmaker used by this ScopedSession."""
for prop in ('close_all', 'object_session', 'identity_key'):
setattr(ScopedSession, prop, clslevel(prop))
-class _ScopedExt(MapperExtension):
- def __init__(self, context, validate=False, save_on_init=True):
- self.context = context
- self.validate = validate
- self.save_on_init = save_on_init
- self.set_kwargs_on_init = True
-
- def validating(self):
- return _ScopedExt(self.context, validate=True)
-
- def configure(self, **kwargs):
- return _ScopedExt(self.context, **kwargs)
-
- def instrument_class(self, mapper, class_):
- class query(object):
- def __getattr__(s, key):
- return getattr(self.context.registry().query(class_), key)
- def __call__(s):
- return self.context.registry().query(class_)
- def __get__(self, instance, cls):
- return self
-
- if not 'query' in class_.__dict__:
- class_.query = query()
-
- if self.set_kwargs_on_init and class_.__init__ is object.__init__:
- class_.__init__ = self._default__init__(mapper)
-
- def _default__init__(ext, mapper):
- def __init__(self, **kwargs):
- for key, value in kwargs.iteritems():
- if ext.validate:
- if not mapper.get_property(key, resolve_synonyms=False,
- raiseerr=False):
- raise sa_exc.ArgumentError(
- "Invalid __init__ argument: '%s'" % key)
- setattr(self, key, value)
- return __init__
-
- def init_instance(self, mapper, class_, oldinit, instance, args, kwargs):
- if self.save_on_init:
- session = kwargs.pop('_sa_session', None)
- if session is None:
- session = self.context.registry()
- session._save_without_cascade(instance)
- return EXT_CONTINUE
-
- def init_failed(self, mapper, class_, oldinit, instance, args, kwargs):
- sess = object_session(instance)
- if sess:
- sess.expunge(instance)
- return EXT_CONTINUE
-
- def dispose_class(self, mapper, class_):
- if hasattr(class_, 'query'):
- delattr(class_, 'query')
mapper = object_mapper(instance)
return mapper.identity_key_from_instance(instance)
-class ExtensionCarrier(dict):
- """Fronts an ordered collection of MapperExtension objects.
-
- Bundles multiple MapperExtensions into a unified callable unit,
- encapsulating ordering, looping and EXT_CONTINUE logic. The
- ExtensionCarrier implements the MapperExtension interface, e.g.::
-
- carrier.after_insert(...args...)
-
- The dictionary interface provides containment for implemented
- method names mapped to a callable which executes that method
- for participating extensions.
-
- """
-
- interface = set(method for method in dir(MapperExtension)
- if not method.startswith('_'))
-
- def __init__(self, extensions=None):
- self._extensions = []
- for ext in extensions or ():
- self.append(ext)
-
- def copy(self):
- return ExtensionCarrier(self._extensions)
-
- def push(self, extension):
- """Insert a MapperExtension at the beginning of the collection."""
- self._register(extension)
- self._extensions.insert(0, extension)
-
- def append(self, extension):
- """Append a MapperExtension at the end of the collection."""
- self._register(extension)
- self._extensions.append(extension)
-
- def __iter__(self):
- """Iterate over MapperExtensions in the collection."""
- return iter(self._extensions)
-
- def _register(self, extension):
- """Register callable fronts for overridden interface methods."""
-
- for method in self.interface.difference(self):
- impl = getattr(extension, method, None)
- if impl and impl is not getattr(MapperExtension, method):
- self[method] = self._create_do(method)
-
- def _create_do(self, method):
- """Return a closure that loops over impls of the named method."""
-
- def _do(*args, **kwargs):
- for ext in self._extensions:
- ret = getattr(ext, method)(*args, **kwargs)
- if ret is not EXT_CONTINUE:
- return ret
- else:
- return EXT_CONTINUE
- _do.__name__ = method
- return _do
-
- @staticmethod
- def _pass(*args, **kwargs):
- return EXT_CONTINUE
-
- def __getattr__(self, key):
- """Delegate MapperExtension methods to bundled fronts."""
-
- if key not in self.interface:
- raise AttributeError(key)
- return self.get(key, self._pass)
-
class ORMAdapter(sql_util.ColumnAdapter):
"""Extends ColumnAdapter to accept ORM entities.
"DDL execution skipped, criteria not met.")
@util.deprecated("0.7", "See :class:`.DDLEvents`, as well as "
- ":meth:`.DDLEvent.execute_if`.")
+ ":meth:`.DDLElement.execute_if`.")
def execute_at(self, event_name, target):
"""Link execution of this DDL to the DDL lifecycle of a SchemaItem.
'on_before_create',
metadata
)
+
+ :param dialect: May be a string, tuple or a callable
+ predicate. If a string, it will be compared to the name of the
+ executing database dialect::
+
+ DDL('something').execute_if(dialect='postgresql')
+
+ If a tuple, specifies multiple dialect names::
+
+ DDL('something').execute_if(dialect=('postgresql', 'mysql'))
+
+ :param callable_: A callable, which will be invoked with
+ four positional arguments as well as optional keyword
+ arguments:
+
+ :ddl:
+ This DDL element.
+
+ :target:
+ The :class:`.Table` or :class:`.MetaData` object which is the target of
+ this event. May be None if the DDL is executed explicitly.
+
+ :bind:
+ The :class:`.Connection` being used for DDL execution
+
+ :tables:
+ Optional keyword argument - a list of Table objects which are to
+ be created/ dropped within a MetaData.create_all() or drop_all()
+ method call.
+
+ If the callable returns a true value, the DDL statement will be
+ executed.
See also:
class DDL(DDLElement):
"""A literal DDL statement.
- Specifies literal SQL DDL to be executed by the database. DDL objects can
- be attached to ``Tables`` or ``MetaData`` instances, conditionally
- executing SQL as part of the DDL lifecycle of those schema items. Basic
- templating support allows a single DDL instance to handle repetitive tasks
- for multiple tables.
+ Specifies literal SQL DDL to be executed by the database. DDL objects
+ function as DDL event listeners, and can be subscribed to those events
+ listed in :ref:`.DDLEvents`, using either :class:`.Table` or :class:`.MetaData`
+ objects as targets. Basic templating support allows a single DDL instance
+ to handle repetitive tasks for multiple tables.
Examples::
+
+ from sqlalchemy import event, DDL
+
+ tbl = Table('users', metadata, Column('uid', Integer))
+ event.listen(DDL('DROP TRIGGER users_trigger'), 'on_before_create', tbl)
- tbl = Table('users', metadata, Column('uid', Integer)) # ...
- DDL('DROP TRIGGER users_trigger').execute_at('before-create', tbl)
-
- spow = DDL('ALTER TABLE %(table)s SET secretpowers TRUE', on='somedb')
- spow.execute_at('after-create', tbl)
+ spow = DDL('ALTER TABLE %(table)s SET secretpowers TRUE')
+ event.listen(spow.execute_if(dialect='somedb'), 'on_after_create', tbl)
drop_spow = DDL('ALTER TABLE users SET secretpowers FALSE')
connection.execute(drop_spow)
%(schema)s - the schema name, with any required quoting applied
%(fullname)s - the Table name including schema, quoted if needed
- The DDL's ``context``, if any, will be combined with the standard
+ The DDL's "context", if any, will be combined with the standard
substutions noted above. Keys present in the context will override
the standard substitutions.
self._assert_stmts(cursor, cursor_stmts)
def test_options(self):
- track = []
+ canary = []
def on_execute(conn, *args, **kw):
- track.append('execute')
+ canary.append('execute')
def on_cursor_execute(conn, *args, **kw):
- track.append('cursor_execute')
+ canary.append('cursor_execute')
engine = engines.testing_engine()
event.listen(on_execute, 'on_before_execute', engine)
c2.execute(select([1]))
c3 = c2.execution_options(bar='bat')
eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'})
- eq_(track, ['execute', 'cursor_execute'])
+ eq_(canary, ['execute', 'cursor_execute'])
+ def test_retval_flag(self):
+ canary = []
+ def tracker(name):
+ def go(conn, *args, **kw):
+ canary.append(name)
+ return go
+ def on_execute(conn, clauseelement, multiparams, params):
+ canary.append('execute')
+ return clauseelement, multiparams, params
+
+ def on_cursor_execute(conn, cursor, statement,
+ parameters, context, executemany):
+ canary.append('cursor_execute')
+ return statement, parameters
+
+ engine = engines.testing_engine()
+
+ assert_raises(
+ tsa.exc.ArgumentError,
+ event.listen, tracker("on_begin"), "on_begin", engine, retval=True
+ )
+
+ event.listen(on_execute, "on_before_execute", engine, retval=True)
+ event.listen(on_cursor_execute, "on_before_cursor_execute", engine, retval=True)
+ engine.execute("select 1")
+ eq_(
+ canary, ['execute', 'cursor_execute']
+ )
+
+
+
def test_transactional(self):
- track = []
+ canary = []
def tracker(name):
def go(conn, *args, **kw):
- track.append(name)
+ canary.append(name)
return go
engine = engines.testing_engine()
conn.execute(select([1]))
trans.commit()
- eq_(track, [
+ eq_(canary, [
'begin', 'execute', 'cursor_execute', 'rollback',
'begin', 'execute', 'cursor_execute', 'commit',
])
@testing.requires.savepoints
@testing.requires.two_phase_transactions
def test_transactional_advanced(self):
- track = []
+ canary = []
def tracker(name):
def go(conn, exec_, *args, **kw):
- track.append(name)
+ canary.append(name)
return exec_(*args, **kw)
return go
trans.prepare()
trans.commit()
- eq_(track, ['begin', 'savepoint',
+ eq_(canary, ['begin', 'savepoint',
'rollback_savepoint', 'savepoint', 'release_savepoint',
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']
@testing.uses_deprecated(r'.*Use event.listen')
def test_options(self):
- track = []
+ canary = []
class TrackProxy(ConnectionProxy):
def __getattribute__(self, key):
fn = object.__getattribute__(self, key)
def go(*arg, **kw):
- track.append(fn.__name__)
+ canary.append(fn.__name__)
return fn(*arg, **kw)
return go
engine = engines.testing_engine(options={'proxy':TrackProxy()})
c2.execute(select([1]))
c3 = c2.execution_options(bar='bat')
eq_(c3._execution_options, {'foo':'bar', 'bar':'bat'})
- eq_(track, ['execute', 'cursor_execute'])
+ eq_(canary, ['execute', 'cursor_execute'])
@testing.uses_deprecated(r'.*Use event.listen')
def test_transactional(self):
- track = []
+ canary = []
class TrackProxy(ConnectionProxy):
def __getattribute__(self, key):
fn = object.__getattribute__(self, key)
def go(*arg, **kw):
- track.append(fn.__name__)
+ canary.append(fn.__name__)
return fn(*arg, **kw)
return go
conn.execute(select([1]))
trans.commit()
- eq_(track, [
+ eq_(canary, [
'begin', 'execute', 'cursor_execute', 'rollback',
'begin', 'execute', 'cursor_execute', 'commit',
])
@testing.requires.savepoints
@testing.requires.two_phase_transactions
def test_transactional_advanced(self):
- track = []
+ canary = []
class TrackProxy(ConnectionProxy):
def __getattribute__(self, key):
fn = object.__getattribute__(self, key)
def go(*arg, **kw):
- track.append(fn.__name__)
+ canary.append(fn.__name__)
return fn(*arg, **kw)
return go
trans.prepare()
trans.commit()
- track = [t for t in track if t not in ('cursor_execute', 'execute')]
- eq_(track, ['begin', 'savepoint',
+ canary = [t for t in canary if t not in ('cursor_execute', 'execute')]
+ eq_(canary, ['begin', 'savepoint',
'rollback_savepoint', 'savepoint', 'release_savepoint',
'rollback', 'begin_twophase',
'prepare_twophase', 'commit_twophase']
import sqlalchemy as sa
from sqlalchemy.test import testing
-from sqlalchemy import Integer, String, ForeignKey
+from sqlalchemy import Integer, String, ForeignKey, event
from sqlalchemy.test.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, create_session
from test.orm import _base
bind.engine.name not in ('oracle', 'mssql', 'sqlite')
),
):
- ins.execute_at('after-create', dt)
-
- sa.DDL("DROP TRIGGER dt_ins").execute_at('before-drop', dt)
+ event.listen(ins, 'on_after_create', dt)
+
+ event.listen(sa.DDL("DROP TRIGGER dt_ins"), 'on_before_drop', dt)
for up in (
sa.DDL("CREATE TRIGGER dt_up AFTER UPDATE ON dt "
bind.engine.name not in ('oracle', 'mssql', 'sqlite')
),
):
- up.execute_at('after-create', dt)
+ event.listen(up, 'on_after_create', dt)
- sa.DDL("DROP TRIGGER dt_up").execute_at('before-drop', dt)
+ event.listen(sa.DDL("DROP TRIGGER dt_up"), 'on_before_drop', dt)
@classmethod
: addresses.c.user_id})
@testing.resolve_artifact_names
- def test_bad_constructor(self):
- """If the construction of a mapped class fails, the instance does not get placed in the session"""
-
- class Foo(object):
- def __init__(self, one, two, _sa_session=None):
- pass
-
- mapper(Foo, users, extension=sa.orm.scoped_session(
- create_session).extension)
-
- sess = create_session()
- assert_raises(TypeError, Foo, 'one', _sa_session=sess)
- eq_(len(list(sess)), 0)
- assert_raises(TypeError, Foo, 'one')
- Foo('one', 'two', _sa_session=sess)
- eq_(len(list(sess)), 1)
-
- @testing.resolve_artifact_names
- def test_constructor_exc_1(self):
- """Exceptions raised in the mapped class are not masked by sa decorations"""
-
- ex = AssertionError('oops')
- sess = create_session()
-
- class Foo(object):
- def __init__(self, **kw):
- raise ex
- mapper(Foo, users)
-
- try:
- Foo()
- assert False
- except Exception, e:
- assert e is ex
-
- sa.orm.clear_mappers()
- mapper(Foo, users, extension=sa.orm.scoped_session(
- create_session).extension)
- def bad_expunge(foo):
- raise Exception("this exception should be stated as a warning")
-
- sess.expunge = bad_expunge
- assert_raises(sa.exc.SAWarning, Foo, _sa_session=sess)
-
- @testing.resolve_artifact_names
- def test_constructor_exc_2(self):
- """TypeError is raised for illegal constructor args, whether or not explicit __init__ is present [ticket:908]."""
+ def test_constructor_exc(self):
+ """TypeError is raised for illegal constructor args,
+ whether or not explicit __init__ is present [ticket:908]."""
class Foo(object):
def __init__(self):
Session.configure, bind=testing.db
)
-class ScopedMapperTest(_ScopedTest):
-
- @classmethod
- def define_tables(cls, metadata):
- Table('table1', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', String(30)))
- Table('table2', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('someid', None, ForeignKey('table1.id')))
-
- @classmethod
- def setup_classes(cls):
- class SomeObject(_base.ComparableEntity):
- pass
- class SomeOtherObject(_base.ComparableEntity):
- pass
-
- @classmethod
- @testing.uses_deprecated()
- @testing.resolve_artifact_names
- def setup_mappers(cls):
- Session = scoped_session(sa.orm.create_session)
- Session.mapper(SomeObject, table1, properties={
- 'options':relationship(SomeOtherObject)
- })
- Session.mapper(SomeOtherObject, table2)
-
- cls.scoping['Session'] = Session
-
- @classmethod
- @testing.resolve_artifact_names
- def insert_data(cls):
- s = SomeObject()
- s.id = 1
- s.data = 'hello'
- sso = SomeOtherObject()
- s.options.append(sso)
- Session.flush()
- Session.expunge_all()
-
- @testing.resolve_artifact_names
- def test_query(self):
- sso = SomeOtherObject.query().first()
- assert SomeObject.query.filter_by(id=1).one().options[0].id == sso.id
-
- @testing.uses_deprecated()
- @testing.resolve_artifact_names
- def test_query_compiles(self):
- class Foo(object):
- pass
- Session.mapper(Foo, table2)
- assert hasattr(Foo, 'query')
-
- ext = sa.orm.MapperExtension()
-
- class Bar(object):
- pass
- Session.mapper(Bar, table2, extension=[ext])
- assert hasattr(Bar, 'query')
-
- class Baz(object):
- pass
- Session.mapper(Baz, table2, extension=ext)
- assert hasattr(Baz, 'query')
-
- @testing.uses_deprecated()
- @testing.resolve_artifact_names
- def test_default_constructor_state_not_shared(self):
- scope = scoped_session(sa.orm.sessionmaker())
-
- class A(object):
- pass
- class B(object):
- def __init__(self):
- pass
-
- scope.mapper(A, table1)
- scope.mapper(B, table2)
-
- A(foo='bar')
- assert_raises(TypeError, B, foo='bar')
-
- scope = scoped_session(sa.orm.sessionmaker())
-
- class C(object):
- def __init__(self):
- pass
- class D(object):
- pass
-
- scope.mapper(C, table1)
- scope.mapper(D, table2)
-
- assert_raises(TypeError, C, foo='bar')
- D(foo='bar')
-
- @testing.uses_deprecated()
- @testing.resolve_artifact_names
- def test_validating_constructor(self):
- s2 = SomeObject(someid=12)
- s3 = SomeOtherObject(someid=123, bogus=345)
-
- class ValidatedOtherObject(object): pass
- Session.mapper(ValidatedOtherObject, table2, validate=True)
-
- v1 = ValidatedOtherObject(someid=12)
- assert_raises(sa.exc.ArgumentError, ValidatedOtherObject,
- someid=12, bogus=345)
-
- @testing.uses_deprecated()
- @testing.resolve_artifact_names
- def test_dont_clobber_methods(self):
- class MyClass(object):
- def expunge(self):
- return "an expunge !"
-
- Session.mapper(MyClass, table2)
-
- assert MyClass().expunge() == "an expunge !"
-
-
-class ScopedMapperTest2(_ScopedTest):
-
- @classmethod
- def define_tables(cls, metadata):
- Table('table1', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('data', String(30)),
- Column('type', String(30)))
- Table('table2', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
- Column('someid', None, ForeignKey('table1.id')),
- Column('somedata', String(30)))
-
- @classmethod
- def setup_classes(cls):
- class BaseClass(_base.ComparableEntity):
- pass
- class SubClass(BaseClass):
- pass
-
- @classmethod
- @testing.uses_deprecated()
- @testing.resolve_artifact_names
- def setup_mappers(cls):
- Session = scoped_session(sa.orm.sessionmaker())
-
- Session.mapper(BaseClass, table1,
- polymorphic_identity='base',
- polymorphic_on=table1.c.type)
- Session.mapper(SubClass, table2,
- polymorphic_identity='sub',
- inherits=BaseClass)
-
- cls.scoping['Session'] = Session
-
- @testing.resolve_artifact_names
- def test_inheritance(self):
- def expunge_list(l):
- for x in l:
- Session.expunge(x)
- return l
-
- b = BaseClass(data='b1')
- s = SubClass(data='s1', somedata='somedata')
- Session.commit()
- Session.expunge_all()
-
- eq_(expunge_list([BaseClass(data='b1'),
- SubClass(data='s1', somedata='somedata')]),
- BaseClass.query.all())
- eq_(expunge_list([SubClass(data='s1', somedata='somedata')]),
- SubClass.query.all())
import sqlalchemy as sa
from sqlalchemy.test import engines, testing, pickleable
-from sqlalchemy import Integer, String, ForeignKey, literal_column
+from sqlalchemy import Integer, String, ForeignKey, literal_column, event
from sqlalchemy.test.schema import Table
from sqlalchemy.test.schema import Column
from sqlalchemy.orm import mapper, relationship, create_session, \
"""The 'batch=False' flag on mapper()"""
names = []
- class TestExtension(sa.orm.MapperExtension):
+ class Events(object):
def before_insert(self, mapper, connection, instance):
self.current_instance = instance
names.append(instance.name)
def after_insert(self, mapper, connection, instance):
assert instance is self.current_instance
- mapper(User, users, extension=TestExtension(), batch=False)
+ mapper(User, users, batch=False)
+
+ evt = Events()
+ event.listen(evt.before_insert, "on_before_insert", User)
+ event.listen(evt.after_insert, "on_after_insert", User)
+
u1 = User(name='user1')
u2 = User(name='user2')
sa.orm.clear_mappers()
- m = mapper(User, users, extension=TestExtension())
+ m = mapper(User, users)
+ evt = Events()
+ event.listen(evt.before_insert, "on_before_insert", User)
+ event.listen(evt.after_insert, "on_after_insert", User)
+
u1 = User(name='user1')
u2 = User(name='user2')
session.add_all((u1, u2))
from sqlalchemy.test.testing import eq_
-class ExtensionCarrierTest(TestBase):
- def test_basic(self):
- carrier = util.ExtensionCarrier()
-
- assert 'translate_row' not in carrier
- assert carrier.translate_row() is interfaces.EXT_CONTINUE
- assert 'translate_row' not in carrier
-
- assert_raises(AttributeError, lambda: carrier.snickysnack)
-
- class Partial(object):
- def __init__(self, marker):
- self.marker = marker
- def translate_row(self, row):
- return self.marker
-
- carrier.append(Partial('end'))
- assert 'translate_row' in carrier
- assert carrier.translate_row(None) == 'end'
-
- carrier.push(Partial('front'))
- assert carrier.translate_row(None) == 'front'
-
- assert 'populate_instance' not in carrier
- carrier.append(interfaces.MapperExtension)
-
- # Py3K
- #assert 'populate_instance' not in carrier
- # Py2K
- assert 'populate_instance' in carrier
- # end Py2K
-
- assert carrier.interface
- for m in carrier.interface:
- assert getattr(interfaces.MapperExtension, m)
-
class AliasedClassTest(TestBase):
def point_map(self, cls):
table = Table('point', MetaData(),