with methods representing events. This is self-documenting via sphinx.
- implemented new model for pool, classmanager. Most events are
one or two args, so going back to allowing any kind of *arg, **kw
signature for events - this is simpler and improves performance,
though we don't get the "we can add new kw's anytime". perhaps
there's some other way to approach that.
schema
inspector
types
+ event
interfaces
util
Interfaces
----------
+SQLAlchemy's interface system is now deprecated, and has been
+superceded by a more flexible and consistent event dispatch
+system. Please see :mod:`sqlalchemy.event` for a description.
+
.. automodule:: sqlalchemy.interfaces
:members:
:undoc-members:
:show-inheritance:
:undoc-members:
:inherited-members:
+ :exclude-members: append, chain, __init__
.. autoclass:: sqlalchemy.pool.QueuePool
:members:
def _do_commit(self):
self.connection._commit_twophase_impl(self.xid, self._is_prepared)
-class _EngineDispatch(event.Dispatch):
+class _EngineDispatch(event.Events):
def append(self, fn, identifier, target):
if isinstance(target.Connection, Connection):
target.Connection = _proxy_connection_cls(target.Connection, self)
Connection = Connection
_dispatch = event.dispatcher(_EngineDispatch)
+
def __init__(self, pool, dialect, url,
logging_name=None, echo=None, proxy=None,
execution_options=None
from operator import attrgetter
from sqlalchemy.engine import base, threadlocal, url
-from sqlalchemy import util, exc
+from sqlalchemy import util, exc, event
from sqlalchemy import pool as poollib
strategies = {}
if _initialize:
do_on_connect = dialect.on_connect()
if do_on_connect:
- def on_connect(conn, rec):
- conn = getattr(conn, '_sqla_unwrap', conn)
+ def on_connect(dbapi_connection, connection_record):
+ conn = getattr(dbapi_connection, '_sqla_unwrap', dbapi_connection)
if conn is None:
return
do_on_connect(conn)
+
+ event.listen(on_connect, 'on_first_connect', pool)
+ event.listen(on_connect, 'on_connect', pool)
- pool.add_listener({'first_connect': on_connect, 'connect':on_connect})
-
- def first_connect(conn, rec):
- c = base.Connection(engine, connection=conn)
+ def first_connect(dbapi_connection, connection_record):
+ c = base.Connection(engine, connection=dbapi_connection)
dialect.initialize(c)
- pool.add_listener({'first_connect':first_connect})
+ event.listen(first_connect, 'on_first_connect', pool)
return engine
+"""
+The event system handles all events throughout the sqlalchemy
+and sqlalchemy.orm packages.
+
+Event specifications:
+
+:attr:`sqlalchemy.pool.Pool.events`
+
+"""
+
from sqlalchemy import util
def listen(fn, identifier, target, *args):
"""Listen for events, passing to fn."""
- target._dispatch.append(fn, identifier, target, *args)
+ getattr(target.events, identifier).append(fn, target)
NO_RESULT = util.symbol('no_result')
+class _DispatchMeta(type):
+ def __init__(cls, classname, bases, dict_):
+ for k in dict_:
+ if k.startswith('on_'):
+ setattr(cls, k, EventDescriptor(dict_[k]))
+ return type.__init__(cls, classname, bases, dict_)
-class Dispatch(object):
+class Events(object):
+ __metaclass__ = _DispatchMeta
+
+ def __init__(self, parent_cls):
+ self.parent_cls = parent_cls
+
+
+class _ExecEvent(object):
+ def exec_and_clear(self, *args, **kw):
+ """Execute the given event once, then clear all listeners."""
- def append(self, identifier, fn, target):
- getattr(self, identifier).append(fn)
-
- def __getattr__(self, key):
- self.__dict__[key] = coll = []
- return coll
-
- def chain(self, identifier, chain_kw, **kw):
- ret = NO_RESULT
- for fn in getattr(self, identifier):
- ret = fn(**kw)
- kw['chain_kw'] = ret
- return ret
-
- def __call__(self, identifier, **kw):
- for fn in getattr(self, identifier):
- fn(**kw)
+ self(*args, **kw)
+ self[:] = []
+ def __call__(self, *args, **kw):
+ """Execute the given event."""
+ if self:
+ for fn in self:
+ fn(*args, **kw)
+
+class EventDescriptor(object):
+ """Represent an event type associated with a :class:`Events` class
+ as well as class-level listeners.
+
+ """
+ def __init__(self, fn):
+ self.__name__ = fn.__name__
+ self.__doc__ = fn.__doc__
+ self._clslevel = []
+
+ def append(self, obj, target):
+ self._clslevel.append((obj, target))
+
+ def __get__(self, obj, cls):
+ if obj is None:
+ return self
+ obj.__dict__[self.__name__] = result = Listeners()
+ result.extend([
+ fn for fn, target in
+ self._clslevel
+ if issubclass(obj.parent_cls, target)
+ ])
+ return result
+
+class Listeners(_ExecEvent, list):
+ """Represent a collection of listeners linked
+ to an instance of :class:`Events`."""
+
+ def append(self, obj, target):
+ list.append(self, obj)
+
class dispatcher(object):
- def __init__(self, dispatch_cls=Dispatch):
- self.dispatch_cls = dispatch_cls
- self._dispatch = dispatch_cls()
+ def __init__(self, events):
+ self.dispatch_cls = events
def __get__(self, obj, cls):
if obj is None:
- return self._dispatch
- obj.__dict__['_dispatch'] = disp = self.dispatch_cls()
- for key in self._dispatch.__dict__:
- if key.startswith('on_'):
- disp.__dict__[key] = self._dispatch.__dict__[k].copy()
+ return self.dispatch_cls
+ obj.__dict__['events'] = disp = self.dispatch_cls(cls)
return disp
"""Interfaces and abstract types."""
-from sqlalchemy.util import as_interface, adapt_kw_to_positional
+from sqlalchemy.util import as_interface
+from sqlalchemy import event
class PoolListener(object):
"""Hooks into the lifecycle of connections in a :class:`Pool`.
.. note:: :class:`PoolListener` is deprecated. Please
- refer to :func:`event.listen`.
+ refer to :func:`event.listen` as well as
+ :attr:`Pool.events`.
Usage::
@classmethod
def _adapt_listener(cls, self, listener):
- """Adapt a :class:`PoolListener` to individual
+ """Adapt a :class:`PoolListener` to individual
:class:`event.Dispatch` events.
"""
- listener = as_interface(listener,
- methods=('connect', 'first_connect', 'checkout', 'checkin'))
+ listener = as_interface(listener, methods=('connect',
+ 'first_connect', 'checkout', 'checkin'))
if hasattr(listener, 'connect'):
- self._dispatch.append('on_connect',
- adapt_kw_to_positional(listener.connect,
- 'dbapi_con', 'con_record'),
- self)
+ event.listen(listener.connect, 'on_connect', self)
if hasattr(listener, 'first_connect'):
- self._dispatch.append('on_first_connect',
- adapt_kw_to_positional(listener.first_connect,
- 'dbapi_con', 'con_record'),
- self)
+ event.listen(listener.first_connect, 'on_first_connect', self)
if hasattr(listener, 'checkout'):
- self._dispatch.append('on_checkout', listener.checkout, self)
+ event.listen(listener.checkout, 'on_checkout', self)
if hasattr(listener, 'checkin'):
- self._dispatch.append('on_checkin', listener.checkin, self)
+ event.listen(listener.checkin, 'on_checkin', self)
def connect(self, dbapi_con, con_record):
import types
import weakref
-from sqlalchemy import util
+from sqlalchemy import util, event
from sqlalchemy.orm import interfaces, collections, exc
import sqlalchemy.exceptions as sa_exc
dict_[self.key] = value
def fire_replace_event(self, state, dict_, value, previous, initiator):
-# value = self._dispatch.chain('set', 'value', state, value, previous, initiator or self)
+ #for fn in self.events.set:
+ # value = fn(state, value, previous, initiator or self)
for ext in self.extensions:
value = ext.set(state, value, previous, initiator or self)
return value
passive=PASSIVE_NO_FETCH)
-class Events(object):
- def __init__(self):
- self.original_init = object.__init__
- # Initialize to tuples instead of lists to minimize the memory
- # footprint
- self.on_init = ()
- self.on_init_failure = ()
- self.on_load = ()
- self.on_resurrect = ()
-
- def run(self, event, *args):
- for fn in getattr(self, event):
- fn(*args)
-
- def add_listener(self, event, listener):
- # not thread safe... problem? mb: nope
- bucket = getattr(self, event)
- if bucket == ():
- setattr(self, event, [listener])
- else:
- bucket.append(listener)
-
- def remove_listener(self, event, listener):
- bucket = getattr(self, event)
- bucket.remove(listener)
-
-
class ClassManager(dict):
"""tracks state information at the class level."""
MANAGER_ATTR = '_sa_class_manager'
STATE_ATTR = '_sa_instance_state'
- event_registry_factory = Events
deferred_scalar_loader = None
+ original_init = object.__init__
+
def __init__(self, class_):
self.class_ = class_
self.factory = None # where we came from, for inheritance bookkeeping
cls_state = manager_of_class(base)
if cls_state:
self.update(cls_state)
- self.events = self.event_registry_factory()
self.manage()
self._instrument_init()
+ class events(event.Events):
+ def on_init(self, state, instance, args, kwargs):
+ """"""
+
+ def on_init_failure(self, state, instance, args, kwargs):
+ """"""
+
+ def on_load(self, instance):
+ """"""
+
+ def on_resurrect(self, state, instance):
+ """"""
+
+ events = event.dispatcher(events)
+
@property
def is_mapped(self):
return 'mapper' in self.__dict__
# our own wrapper, but it would
# be nice to wrap the original __init__ and not our existing wrapper
# of such, since this adds method overhead.
- self.events.original_init = self.class_.__init__
+ self.original_init = self.class_.__init__
self.new_init = _generate_init(self.class_, self)
self.install_member('__init__', self.new_init)
from itertools import chain, groupby
deque = __import__('collections').deque
-from sqlalchemy import sql, util, log, exc as sa_exc
+from sqlalchemy import sql, util, log, exc as sa_exc, event
from sqlalchemy.sql import expression, visitors, operators, util as sqlutil
from sqlalchemy.orm import attributes, sync, exc as orm_exc, unitofwork
from sqlalchemy.orm.interfaces import (
if manager.info.get(_INSTRUMENTOR, False):
return
- event_registry = manager.events
- event_registry.add_listener('on_init', _event_on_init)
- event_registry.add_listener('on_init_failure', _event_on_init_failure)
- event_registry.add_listener('on_resurrect', _event_on_resurrect)
+ event.listen(_event_on_init, 'on_init', manager)
+ event.listen(_event_on_init_failure, 'on_init_failure', manager)
+ event.listen(_event_on_resurrect, 'on_resurrect', manager)
for key, method in util.iterate_attributes(self.class_):
if isinstance(method, types.FunctionType):
if hasattr(method, '__sa_reconstructor__'):
- event_registry.add_listener('on_load', method)
+ event.listen(method, 'on_load', manager)
elif hasattr(method, '__sa_validators__'):
for name in method.__sa_validators__:
self._validators[name] = method
if 'reconstruct_instance' in self.extension:
def reconstruct(instance):
self.extension.reconstruct_instance(self, instance)
- event_registry.add_listener('on_load', reconstruct)
+ event.listen(reconstruct, 'on_load', manager)
manager.info[_INSTRUMENTOR] = self
if 'init_instance' in instrumenting_mapper.extension:
instrumenting_mapper.extension.init_instance(
instrumenting_mapper, instrumenting_mapper.class_,
- state.manager.events.original_init,
+ state.manager.original_init,
instance, args, kwargs)
def _event_on_init_failure(state, instance, args, kwargs):
util.warn_exception(
instrumenting_mapper.extension.init_failed,
instrumenting_mapper, instrumenting_mapper.class_,
- state.manager.events.original_init, instance, args, kwargs)
+ state.manager.original_init, instance, args, kwargs)
def _event_on_resurrect(state, instance):
# re-populate the primary key elements
self, instance, args = mixed[0], mixed[1], mixed[2:]
manager = self.manager
- for fn in manager.events.on_init:
- fn(self, instance, args, kwargs)
+ manager.events.on_init(self, instance, args, kwargs)
# LESSTHANIDEAL:
# adjust for the case where the InstanceState was created before
self.mutable_dict = {}
try:
- return manager.events.original_init(*mixed[1:], **kwargs)
+ return manager.original_init(*mixed[1:], **kwargs)
except:
- for fn in manager.events.on_init_failure:
- fn(self, instance, args, kwargs)
+ manager.events.on_init_failure(self, instance, args, kwargs)
raise
def get_history(self, key, **kwargs):
return [x]
def _run_on_load(self, instance):
- self.manager.events.run('on_load', instance)
+ self.manager.events.on_load(instance)
def __getstate__(self):
d = {'instance':self.obj()}
obj.__dict__.update(self.mutable_dict)
# re-establishes identity attributes from the key
- self.manager.events.run('on_resurrect', self, obj)
+ self.manager.events.on_resurrect(self, obj)
# TODO: don't really think we should run this here.
# resurrect is only meant to preserve the minimal state needed to
self._reset_on_return = reset_on_return
self.echo = echo
if _dispatch:
- self._dispatch = _dispatch
+ self.events = _dispatch
if listeners:
for l in listeners:
self.add_listener(l)
- if False:
- # this might be a nice way to define events and have them
- # documented at the same time.
- class events(event.Dispatch):
- def on_connect(self, dbapi_con, con_record):
- """Called once for each new DB-API connection or Pool's ``creator()``.
+ class events(event.Events):
+ """Available events for :class:`Pool`.
+
+ The methods here define the name of an event as well
+ as the names of members that are passed to listener
+ functions. Note all members are passed by name.
+
+ e.g.::
+
+ from sqlalchemy import events
+ events.listen(fn, 'on_checkout', Pool)
+
+ """
+
+ def on_connect(self, dbapi_connection, connection_record):
+ """Called once for each new DB-API connection or Pool's ``creator()``.
+
+ :param dbapi_con:
+ A newly connected raw DB-API connection (not a SQLAlchemy
+ ``Connection`` wrapper).
+
+ :param con_record:
+ The ``_ConnectionRecord`` that persistently manages the connection
+
+ """
+
+ def on_first_connect(self, dbapi_connection, connection_record):
+ """Called exactly once for the first DB-API connection.
+
+ :param dbapi_con:
+ A newly connected raw DB-API connection (not a SQLAlchemy
+ ``Connection`` wrapper).
- dbapi_con
- A newly connected raw DB-API connection (not a SQLAlchemy
- ``Connection`` wrapper).
+ :param con_record:
+ The ``_ConnectionRecord`` that persistently manages the connection
- con_record
- The ``_ConnectionRecord`` that persistently manages the connection
+ """
- """
+ def on_checkout(self, dbapi_connection, connection_record, connection_proxy):
+ """Called when a connection is retrieved from the Pool.
+
+ :param dbapi_con:
+ A raw DB-API connection
+
+ :param con_record:
+ The ``_ConnectionRecord`` that persistently manages the connection
+
+ :param con_proxy:
+ The ``_ConnectionFairy`` which manages the connection for the span of
+ the current checkout.
+
+ If you raise an ``exc.DisconnectionError``, the current
+ connection will be disposed and a fresh connection retrieved.
+ Processing of all checkout listeners will abort and restart
+ using the new connection.
+ """
+
+ def on_checkin(self, dbapi_connection, connection_record):
+ """Called when a connection returns to the pool.
+
+ Note that the connection may be closed, and may be None if the
+ connection has been invalidated. ``checkin`` will not be called
+ for detached connections. (They do not return to the pool.)
+
+ :param dbapi_con:
+ A raw DB-API connection
+
+ :param con_record:
+ The ``_ConnectionRecord`` that persistently manages the connection
+
+ """
+ events = event.dispatcher(events)
- _dispatch = event.dispatcher()
-
@util.deprecated("Use event.listen()")
def add_listener(self, listener):
"""Add a ``PoolListener``-like object to this pool.
self.connection = self.__connect()
self.info = {}
- if pool._dispatch.on_first_connect:
- pool._dispatch('on_first_connect', dbapi_con=self.connection, con_record=self)
- del pool._dispatch.on_first_connect
- if pool._dispatch.on_connect:
- pool._dispatch('on_connect', dbapi_con=self.connection, con_record=self)
+ pool.events.on_first_connect.exec_and_clear(self.connection, self)
+ pool.events.on_connect(self.connection, self)
def close(self):
if self.connection is not None:
if self.connection is None:
self.connection = self.__connect()
self.info.clear()
- if self.__pool._dispatch.on_connect:
- self.__pool._dispatch('on_connect', dbapi_con=self.connection, con_record=self)
+ if self.__pool.events.on_connect:
+ self.__pool.events.on_connect(self.connection, con_record)
elif self.__pool._recycle > -1 and \
time.time() - self.starttime > self.__pool._recycle:
self.__pool.logger.info(
self.__close()
self.connection = self.__connect()
self.info.clear()
- if self.__pool._dispatch.on_connect:
- self.__pool._dispatch('on_connect', dbapi_con=self.connection, con_record=self)
+ if self.__pool.events.on_connect:
+ self.__pool.events.on_connect(self.connection, con_record)
return self.connection
def __close(self):
if connection_record is not None:
connection_record.fairy = None
pool.logger.debug("Connection %r being returned to pool", connection)
- if pool._dispatch.on_checkin:
- pool._dispatch('on_checkin', dbapi_con=connection, con_record=connection_record)
+ if pool.events.on_checkin:
+ pool.events.on_checkin(connection, connection_record)
pool.return_conn(connection_record)
_refs = set()
raise exc.InvalidRequestError("This connection is closed")
self.__counter += 1
- if not self._pool._dispatch.on_checkout or self.__counter != 1:
+ if not self._pool.events.on_checkout or self.__counter != 1:
return self
# Pool listeners can trigger a reconnection on checkout
attempts = 2
while attempts > 0:
try:
- self._pool._dispatch('on_checkout', dbapi_con=self.connection,
- con_record=self._connection_record,
- con_proxy=self)
+ self._pool.events.on_checkout(self.connection,
+ self._connection_record,
+ self)
return self
except exc.DisconnectionError, e:
self._pool.logger.info(
echo=self.echo,
logging_name=self._orig_logging_name,
use_threadlocal=self._use_threadlocal,
- _dispatch=self._dispatch)
+ _dispatch=self.events)
def dispose(self):
"""Dispose of this pool."""
recycle=self._recycle, echo=self.echo,
logging_name=self._orig_logging_name,
use_threadlocal=self._use_threadlocal,
- _dispatch=self._dispatch)
+ _dispatch=self.events)
def do_return_conn(self, conn):
try:
echo=self.echo,
logging_name=self._orig_logging_name,
use_threadlocal=self._use_threadlocal,
- _dispatch=self._dispatch)
+ _dispatch=self.events)
def dispose(self):
pass
reset_on_return=self._reset_on_return,
echo=self.echo,
logging_name=self._orig_logging_name,
- _dispatch=self._dispatch)
+ _dispatch=self.events)
def create_connection(self):
return self._conn
self.logger.info("Pool recreating")
return AssertionPool(self._creator, echo=self.echo,
logging_name=self._orig_logging_name,
- _dispatch=self._dispatch)
+ _dispatch=self.events)
def do_get(self):
if self._checked_out:
"Argument '%s' is expected to be of type '%s', got '%s'" %
(name, argtype, type(arg)))
-def adapt_kw_to_positional(fn, *args):
- def call(**kw):
- return fn(*[kw[a] for a in args])
- return call
-
_creation_order = 1
def set_creation_order(instance):
"""Assign a '_creation_order' sequence to the given instance.
from sqlalchemy.test.testing import assert_raises, assert_raises_message
import sqlalchemy as sa
-from sqlalchemy import MetaData, Integer, ForeignKey, util
+from sqlalchemy import MetaData, Integer, ForeignKey, util, event
from sqlalchemy.test.schema import Table
from sqlalchemy.test.schema import Column
from sqlalchemy.orm import mapper, relationship, create_session, attributes, class_mapper, clear_mappers
manager = attributes.manager_of_class(cls)
def on_init(state, instance, args, kwargs):
canary.append((cls, 'on_init', type(instance)))
- manager.events.add_listener('on_init', on_init)
+ event.listen(on_init, 'on_init', manager)
def test_ai(self):
inits = []
try:
attributes.register_class(A)
manager = attributes.manager_of_class(A)
- manager.events.add_listener('on_load', canary)
+ event.listen(canary, 'on_load', manager)
a = A()
p_a = pickle.dumps(a)
@modifies_instrumentation_finders
def test_subclassed(self):
- class MyEvents(attributes.Events):
+ class MyEvents(attributes.ClassManager.events):
pass
class MyClassManager(attributes.ClassManager):
- event_registry_factory = MyEvents
+ events = event.dispatcher(MyEvents)
attributes.instrumentation_finders.insert(0, lambda cls: MyClassManager)