# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-import inspect
+import inspect as _inspect
import sys
import sqlalchemy.exc as exceptions
UniqueConstraint,
)
+from sqlalchemy.inspection import inspect
+
from sqlalchemy.engine import create_engine, engine_from_config
__all__ = sorted(name for name, obj in locals().items()
- if not (name.startswith('_') or inspect.ismodule(obj)))
+ if not (name.startswith('_') or _inspect.ismodule(obj)))
__version__ = '0.7.7'
-del inspect, sys
+del _inspect, sys
from sqlalchemy import util as _sa_util
_sa_util.importlater.resolve_all()
from sqlalchemy.util import topological
from sqlalchemy.types import TypeEngine
from sqlalchemy import schema as sa_schema
-
+from sqlalchemy import inspection
+from sqlalchemy.engine.base import Connectable
@util.decorator
def cache(fn, self, con, *args, **kw):
return bind.dialect.inspector(bind)
return Inspector(bind)
+ @inspection._inspects(Connectable)
+ def _insp(bind):
+ return Inspector.from_engine(bind)
+
@property
def default_schema_name(self):
"""Return the default schema name presented by the dialect
--- /dev/null
+# sqlalchemy/inspect.py
+# Copyright (C) 2005-2012 the SQLAlchemy authors and contributors <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+"""Base inspect API.
+
+:func:`.inspect` provides access to a contextual object
+regarding a subject.
+
+Various subsections of SQLAlchemy,
+such as the :class:`.Inspector`, :class:`.Mapper`, and
+others register themselves with the "inspection registry" here
+so that they may return a context object given a certain kind
+of argument.
+"""
+
+from sqlalchemy import util
+_registrars = util.defaultdict(list)
+
+def inspect(subject):
+ type_ = type(subject)
+ for cls in type_.__mro__:
+ if cls in _registrars:
+ reg = _registrars[cls]
+ break
+ else:
+ raise exc.InvalidRequestError(
+ "No inspection system is "
+ "available for object of type %s" %
+ type_)
+ return reg(subject)
+
+def _inspects(*types):
+ def decorate(fn_or_cls):
+ for type_ in types:
+ if type_ in _registrars:
+ raise AssertionError(
+ "Type %s is already "
+ "registered" % type_)
+ _registrars[type_] = fn_or_cls
+ return fn_or_cls
+ return decorate
from sqlalchemy.orm import exc, collections, events
from operator import attrgetter, itemgetter
-from sqlalchemy import event, util
+from sqlalchemy import event, util, inspection
import weakref
from sqlalchemy.orm import state, attributes
import sys
sessionlib = util.importlater("sqlalchemy.orm", "session")
properties = util.importlater("sqlalchemy.orm", "properties")
+descriptor_props = util.importlater("sqlalchemy.orm", "descriptor_props")
__all__ = (
'Mapper',
continue
yield c
- @property
+ @util.memoized_property
def properties(self):
- raise NotImplementedError(
- "Public collection of MapperProperty objects is "
- "provided by the get_property() and iterate_properties "
- "accessors.")
+ if _new_mappers:
+ configure_mappers()
+ return util.ImmutableProperties(self._props)
+
+ @_memoized_configured_property
+ def synonyms(self):
+ return self._filter_properties(descriptor_props.SynonymProperty)
+
+ @_memoized_configured_property
+ def column_attrs(self):
+ return self._filter_properties(properties.ColumnProperty)
+
+ @_memoized_configured_property
+ def relationships(self):
+ return self._filter_properties(properties.RelationshipProperty)
+
+ @_memoized_configured_property
+ def composites(self):
+ return self._filter_properties(descriptor_props.CompositeProperty)
+
+ def _filter_properties(self, type_):
+ if _new_mappers:
+ configure_mappers()
+ return dict(
+ (k, v) for k, v in self._props.iteritems()
+ if isinstance(v, type_)
+ )
@_memoized_configured_property
def _get_clause(self):
def pending(self):
return {}
+ @util.memoized_property
+ def mapper(self):
+ return self.manager.mapper
+
@property
def has_identity(self):
return bool(self.key)
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-from sqlalchemy import sql, util, event, exc as sa_exc
+from sqlalchemy import sql, util, event, exc as sa_exc, inspection
from sqlalchemy.sql import expression, util as sql_util, operators
from sqlalchemy.orm.interfaces import MapperExtension, EXT_CONTINUE,\
PropComparator, MapperProperty
Raises UnmappedInstanceError if no mapping is configured.
+ This function is available via the inspection system as::
+
+ inspect(instance).mapper
+
+ """
+ return object_state(instance).mapper
+
+@inspection._inspects(object)
+def object_state(instance):
+ """Given an object, return the primary Mapper associated with the object
+ instance.
+
+ Raises UnmappedInstanceError if no mapping is configured.
+
+ This function is available via the inspection system as::
+
+ inspect(instance)
+
"""
try:
- state = attributes.instance_state(instance)
- return state.manager.mapper
+ return attributes.instance_state(instance)
except exc.UnmappedClassError:
raise exc.UnmappedInstanceError(instance)
except exc.NO_STATE:
raise exc.UnmappedInstanceError(instance)
+
+@inspection._inspects(type)
def class_mapper(class_, compile=True):
"""Given a class, return the primary :class:`.Mapper` associated
with the key.
on the given class, or :class:`.ArgumentError` if a non-class
object is passed.
+ This function is available via the inspection system as::
+
+ inspect(some_mapped_class)
+
"""
try:
--- /dev/null
+"""test the inspection registry system."""
+
+from test.lib.testing import eq_, assert_raises
+from sqlalchemy import exc, util
+from sqlalchemy import inspection, inspect
+from test.lib import fixtures
+
+class TestFixture(object):
+ pass
+
+class TestEvents(fixtures.TestBase):
+ """Test class- and instance-level event registration."""
+
+ def tearDown(self):
+ for type_ in list(inspection._registrars):
+ if issubclass(type_, TestFixture):
+ del inspection._registrars[type_]
+
+ def test_def_insp(self):
+ class SomeFoo(TestFixture):
+ pass
+
+ @inspection._inspects(SomeFoo)
+ def insp_somefoo(subject):
+ return {"insp":subject}
+
+ somefoo = SomeFoo()
+ insp = inspect(somefoo)
+ assert insp["insp"] is somefoo
+
+ def test_class_insp(self):
+ class SomeFoo(TestFixture):
+ pass
+
+ @inspection._inspects(SomeFoo)
+ class SomeFooInspect(object):
+ def __init__(self, target):
+ self.target = target
+
+ somefoo = SomeFoo()
+ insp = inspect(somefoo)
+ assert isinstance(insp, SomeFooInspect)
+ assert insp.target is somefoo
+
+ def test_hierarchy_insp(self):
+ class SomeFoo(TestFixture):
+ pass
+
+ class SomeSubFoo(SomeFoo):
+ pass
+
+ @inspection._inspects(SomeFoo)
+ def insp_somefoo(subject):
+ return 1
+
+ @inspection._inspects(SomeSubFoo)
+ def insp_somesubfoo(subject):
+ return 2
+
+ somefoo = SomeFoo()
+ eq_(inspect(SomeFoo()), 1)
+ eq_(inspect(SomeSubFoo()), 2)
from test.lib.testing import eq_, assert_raises, assert_raises_message
import StringIO, unicodedata
from sqlalchemy import types as sql_types
-from sqlalchemy import schema, events, event
-from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy import schema, events, event, inspect
from sqlalchemy import MetaData, Integer, String
from test.lib.schema import Table, Column
import sqlalchemy as sa
testing, engines, AssertsCompiledSQL
from test.lib import fixtures
-create_inspector = Inspector.from_engine
-
metadata, users = None, None
class ReflectionTest(fixtures.TestBase, ComparesTables):
def test_inspector_conn_closing(self):
m1 = MetaData()
c = testing.db.connect()
- i = Inspector.from_engine(testing.db)
+ i = inspect(testing.db)
assert not c.closed
@testing.provide_metadata
@testing.requires.unicode_connections
def test_get_names(self):
- inspector = Inspector.from_engine(self.bind)
+ inspector = inspect(self.bind)
names = dict(
(tname, (cname, ixname)) for tname, cname, ixname in self.names
)
@testing.requires.schemas
def test_get_schema_names(self):
- insp = Inspector(testing.db)
+ insp = inspect(testing.db)
self.assert_('test_schema' in insp.get_schema_names())
def test_dialect_initialize(self):
engine = engines.testing_engine()
assert not hasattr(engine.dialect, 'default_schema_name')
- insp = Inspector(engine)
+ insp = inspect(engine)
assert hasattr(engine.dialect, 'default_schema_name')
def test_get_default_schema_name(self):
- insp = Inspector(testing.db)
+ insp = inspect(testing.db)
eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
@testing.provide_metadata
meta.create_all()
_create_views(meta.bind, schema)
try:
- insp = Inspector(meta.bind)
+ insp = inspect(meta.bind)
if table_type == 'view':
table_names = insp.get_view_names(schema)
table_names.sort()
_create_views(meta.bind, schema)
table_names = ['users_v', 'email_addresses_v']
try:
- insp = Inspector(meta.bind)
+ insp = inspect(meta.bind)
for table_name, table in zip(table_names, (users,
addresses)):
schema_name = schema
meta = self.metadata
users, addresses, dingalings = createTables(meta, schema)
meta.create_all()
- insp = Inspector(meta.bind)
+ insp = inspect(meta.bind)
users_pkeys = insp.get_primary_keys(users.name,
schema=schema)
eq_(users_pkeys, ['user_id'])
meta = self.metadata
users, addresses, dingalings = createTables(meta, schema)
meta.create_all()
- insp = Inspector(meta.bind)
+ insp = inspect(meta.bind)
expected_schema = schema
# users
users_fkeys = insp.get_foreign_keys(users.name,
createIndexes(meta.bind, schema)
# The database may decide to create indexes for foreign keys, etc.
# so there may be more indexes than expected.
- insp = Inspector(meta.bind)
+ insp = inspect(meta.bind)
indexes = insp.get_indexes('users', schema=schema)
expected_indexes = [
{'unique': False,
view_name1 = 'users_v'
view_name2 = 'email_addresses_v'
try:
- insp = Inspector(meta.bind)
+ insp = inspect(meta.bind)
v1 = insp.get_view_definition(view_name1, schema=schema)
self.assert_(v1)
v2 = insp.get_view_definition(view_name2, schema=schema)
meta = self.metadata
users, addresses, dingalings = createTables(meta, schema)
meta.create_all()
- insp = create_inspector(meta.bind)
+ insp = inspect(meta.bind)
oid = insp.get_table_oid(table_name, schema)
self.assert_(isinstance(oid, (int, long)))
from sqlalchemy import util
from test.lib.schema import Table
from test.lib.schema import Column
-from sqlalchemy.orm import attributes
+from sqlalchemy.orm import attributes, mapper, relationship, \
+ backref, configure_mappers
from test.lib import fixtures
__all__ = ()
class CompositePk(Base):
pass
+ @classmethod
+ def _setup_stock_mapping(cls):
+ Node, composite_pk_table, users, Keyword, items, Dingaling, \
+ order_items, item_keywords, Item, User, dingalings, \
+ Address, keywords, CompositePk, nodes, Order, orders, \
+ addresses = cls.classes.Node, \
+ cls.tables.composite_pk_table, cls.tables.users, \
+ cls.classes.Keyword, cls.tables.items, \
+ cls.classes.Dingaling, cls.tables.order_items, \
+ cls.tables.item_keywords, cls.classes.Item, \
+ cls.classes.User, cls.tables.dingalings, \
+ cls.classes.Address, cls.tables.keywords, \
+ cls.classes.CompositePk, cls.tables.nodes, \
+ cls.classes.Order, cls.tables.orders, cls.tables.addresses
+
+ mapper(User, users, properties={
+ 'addresses':relationship(Address, backref='user', order_by=addresses.c.id),
+ 'orders':relationship(Order, backref='user', order_by=orders.c.id), # o2m, m2o
+ })
+ mapper(Address, addresses, properties={
+ 'dingaling':relationship(Dingaling, uselist=False, backref="address") #o2o
+ })
+ mapper(Dingaling, dingalings)
+ mapper(Order, orders, properties={
+ 'items':relationship(Item, secondary=order_items, order_by=items.c.id), #m2m
+ 'address':relationship(Address), # m2o
+ })
+ mapper(Item, items, properties={
+ 'keywords':relationship(Keyword, secondary=item_keywords) #m2m
+ })
+ mapper(Keyword, keywords)
+
+ mapper(Node, nodes, properties={
+ 'children':relationship(Node,
+ backref=backref('parent', remote_side=[nodes.c.id])
+ )
+ })
+
+ mapper(CompositePk, composite_pk_table)
+
+ configure_mappers()
+
@classmethod
def define_tables(cls, metadata):
Table('users', metadata,
--- /dev/null
+"""test the inspection registry system."""
+
+from test.lib.testing import eq_, assert_raises
+from sqlalchemy import exc, util
+from sqlalchemy import inspect
+from test.orm import _fixtures
+from sqlalchemy.orm import class_mapper, synonym
+from sqlalchemy.orm.attributes import instance_state
+
+class TestORMInspection(_fixtures.FixtureTest):
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+ inspect(cls.classes.User).add_property(
+ "name_syn",synonym("name")
+ )
+
+ def test_class_mapper(self):
+ User = self.classes.User
+
+ assert inspect(User) is class_mapper(User)
+
+ def test_instance_state(self):
+ User = self.classes.User
+ u1 = User()
+
+ assert inspect(u1) is instance_state(u1)
+
+ def test_synonyms(self):
+ User = self.classes.User
+ syn = inspect(User).synonyms
+
+ # TODO: some of the synonym debacle in 0.7
+ # has led User.name_syn.property to be the
+ # ColumnProperty. not sure if we want that
+ # implicit jump in there though, perhaps get Query/etc. to
+ # call upon "effective_property" or something like that
+
+ eq_(inspect(User).synonyms, {
+ "name_syn":class_mapper(User).get_property("name_syn")
+ })
+
+ # TODO: test all these accessors...
+
+"""
+# column collection
+>>> b.columns
+[<id column>, <name column>]
+
+# its a ColumnCollection
+>>> b.columns.id
+<id column>
+
+# i.e. from mapper
+>>> b.primary_key
+(<id column>, )
+
+# i.e. from mapper
+>>> b.local_table
+<user table>
+
+# ColumnProperty
+>>> b.attr.id.columns
+[<id column>]
+
+# but perhaps we use a collection with some helpers
+>>> b.attr.id.columns.first
+<id column>
+
+# and a mapper? its None since this is a column
+>>> b.attr.id.mapper
+None
+
+# attr is basically the _props
+>>> b.attr.keys()
+['id', 'name', 'name_syn', 'addresses']
+
+# b itself is likely just the mapper
+>>> b
+<User mapper>
+
+# get only column attributes
+>>> b.column_attrs
+[<id prop>, <name prop>]
+
+# its a namespace
+>>> b.column_attrs.id
+<id prop>
+
+# get only synonyms
+>>> b.synonyms
+[<name syn prop>]
+
+# get only relationships
+>>> b.relationships
+[<addresses prop>]
+
+# its a namespace
+>>> b.relationships.addresses
+<addresses prop>
+
+# point inspect() at a class level attribute,
+# basically returns ".property"
+>>> b = inspect(User.addresses)
+>>> b
+<addresses prop>
+
+# mapper
+>>> b.mapper
+<Address mapper>
+
+# None columns collection, just like columnprop has empty mapper
+>>> b.columns
+None
+
+# the parent
+>>> b.parent
+<User mapper>
+
+# __clause_element__()
+>>> b.expression
+User.id==Address.user_id
+
+>>> inspect(User.id).expression
+<id column with ORM annotations>
+
+
+# inspect works on instances !
+>>> u1 = User(id=3, name='x')
+>>> b = inspect(u1)
+
+# what's b here ? probably InstanceState
+>>> b
+<InstanceState>
+
+>>> b.attr.keys()
+['id', 'name', 'name_syn', 'addresses']
+
+# this is class level stuff - should this require b.mapper.columns ?
+>>> b.columns
+[<id column>, <name column>]
+
+# does this return '3'? or an object?
+>>> b.attr.id
+<magic attribute inspect thing>
+
+# or does this ?
+>>> b.attr.id.value
+3
+
+>>> b.attr.id.history
+<history object>
+
+>>> b.attr.id.history.unchanged
+3
+
+>>> b.attr.id.history.deleted
+None
+
+# lets assume the object is persistent
+>>> s = Session()
+>>> s.add(u1)
+>>> s.commit()
+
+# big one - the primary key identity ! always
+# works in query.get()
+>>> b.identity
+[3]
+
+# the mapper level key
+>>> b.identity_key
+(User, [3])
+
+>>> b.persistent
+True
+
+>>> b.transient
+False
+
+>>> b.deleted
+False
+
+>>> b.detached
+False
+
+>>> b.session
+<session>
+
+# the object. this navigates obj()
+# of course, would be nice if it was b.obj...
+>>> b.object_
+<User instance u1>
+
+"""
- def test_prop_accessor(self):
- users, User = self.tables.users, self.classes.User
-
- mapper(User, users)
- assert_raises(NotImplementedError,
- getattr, sa.orm.class_mapper(User), 'properties')
-
-
def test_friendly_attribute_str_on_uncompiled_boom(self):
User, users = self.classes.User, self.tables.users
@classmethod
def setup_mappers(cls):
- Node, composite_pk_table, users, Keyword, items, Dingaling, \
- order_items, item_keywords, Item, User, dingalings, \
- Address, keywords, CompositePk, nodes, Order, orders, \
- addresses = cls.classes.Node, \
- cls.tables.composite_pk_table, cls.tables.users, \
- cls.classes.Keyword, cls.tables.items, \
- cls.classes.Dingaling, cls.tables.order_items, \
- cls.tables.item_keywords, cls.classes.Item, \
- cls.classes.User, cls.tables.dingalings, \
- cls.classes.Address, cls.tables.keywords, \
- cls.classes.CompositePk, cls.tables.nodes, \
- cls.classes.Order, cls.tables.orders, cls.tables.addresses
-
- mapper(User, users, properties={
- 'addresses':relationship(Address, backref='user', order_by=addresses.c.id),
- 'orders':relationship(Order, backref='user', order_by=orders.c.id), # o2m, m2o
- })
- mapper(Address, addresses, properties={
- 'dingaling':relationship(Dingaling, uselist=False, backref="address") #o2o
- })
- mapper(Dingaling, dingalings)
- mapper(Order, orders, properties={
- 'items':relationship(Item, secondary=order_items, order_by=items.c.id), #m2m
- 'address':relationship(Address), # m2o
- })
- mapper(Item, items, properties={
- 'keywords':relationship(Keyword, secondary=item_keywords) #m2m
- })
- mapper(Keyword, keywords)
-
- mapper(Node, nodes, properties={
- 'children':relationship(Node,
- backref=backref('parent', remote_side=[nodes.c.id])
- )
- })
-
- mapper(CompositePk, composite_pk_table)
-
- configure_mappers()
+ cls._setup_stock_mapping()
class MiscTest(QueryTest):
run_create_tables = None