- Added a new `relation()` keyword `back_populates`. This allows configuation of backreferences using explicit relations. [ticket:781] This is required when creating bidirectional relations between a hierarchy of concrete mappers and another class. [ticket:1237]
- Test coverage added for `relation()` objects specified on concrete mappers. [ticket:1237]
- A short documentation example added for bidirectional relations specified on concrete mappers. [ticket:1237]
- Mappers now instrument class attributes upon construction with the final InstrumentedAttribute object which remains persistent. The `_CompileOnAttr`/`__getattribute__()` methodology has been removed. The net effect is that Column-based mapped class attributes can now be used fully at the class level without invoking a mapper compilation operation, greatly simplifying typical usage patterns within declarative. [ticket:1269]
- Index now accepts column-oriented InstrumentedAttributes (i.e. column-based mapped class attributes) as column arguments. [ticket:1214]
- Broke up attributes.register_attribute into two separate functions register_descriptor and register_attribute_impl. The first assembles an InstrumentedAttribute or Proxy descriptor, the second assembles the AttributeImpl inside the InstrumentedAttribute. register_attribute remains for outside compatibility. The argument lists have been simplified.
- Removed class_manager argument from all but MutableScalarAttributeImpl (the branch had removed class_ as well but this has been reverted locally to support the serializer extension).
- Mapper's previous construction of _CompileOnAttr now moves to a new MapperProperty.instrument_class() method which is called on all MapperProperty objects at the moment the mapper receives them. All MapperProperty objects now call attributes.register_descriptor within that method to assemble an InstrumentedAttribute object directly.
- InstrumentedAttribute now receives the "property" attribute from the given PropComparator. The guesswork within the constructor is removed, and allows "property" to serve as a mapper compilation trigger.
- RelationProperty.Comparator now triggers compilation of its parent mapper within a util.memoized_property accessor for the "property" attribute, which is used instead of "prop" (we can probably remove "prop").
- ColumnProperty and similar handle most of their initialization in their __init__ method since they must function fully at the class level before mappers are compiled.
- SynonymProperty and ComparableProperty move their class instrumentation logic to the new instrument_class() method.
- LoaderStrategy objects now add their state to existing InstrumentedAttributes using attributes.register_attribute_impl. Both column and relation-based loaders instrument in the same way now, with a unique InstrumentedAttribute *and* a unique AttributeImpl for each class in the hierarchy. attribute.parententity should now be correct in all cases.
- Removed unitofwork.register_attribute, and simpified the _register_attribute methods into a single function in strategies.py. unitofwork exports the UOWEventHandler extension directly.
- To accomodate the multiple AttributeImpls across a class hierarchy, the sethasparent() method now uses an optional "parent_token" attribute to identify the "parent". AbstractRelationLoader sends the MapperProperty along to serve as this token. If the token isn't present (which is only the case in the attributes unit tests), the AttributeImpl is used instead, which is essentially the same as the old behavior.
- Added new ConcreteInheritedProperty MapperProperty. This is invoked for concrete mappers within _adapt_inherited_property() to accomodate concrete mappers which inherit unhandled attributes from the base class, and basically raises an exception upon access. [ticket:1237]
- attributes.register_attribute and register_descriptor will now re-instrument an attribute unconditionally without checking for a previous attribute. Not sure if this is controversial. It's needed so that ConcreteInheritedProperty instrumentation can be overridden by an incoming legit MapperProperty without any complexity.
- Added new UninstrumentedColumnLoader LoaderStrategy. This is used by the polymorphic_on argument when the given column is not represented within the mapped selectable, as is typical with a concrete scenario which maps to a polymorphic union. It does not configure class instrumentation, keeping polymorphic_on from getting caught up in the new concrete attribute-checking logic.
- RelationProperty now records its "backref" attributes using a set assigned to `_reverse_property` instead of a scalar. The `back_populates` keyword allows any number of properties to be involved in a single bidirectional relation. Changes were needed to RelationProperty.merge(), DependencyProcessor to accomodate for the new multiple nature of this attribute.
- Generalized the methodology used by ManyToManyDP to check for "did the other dependency already handle this direction", building on the `_reverse_property` collection.
- post_update logic within dependency.py moves to use the same methodology as ManyToManyDP so that "did the other dependency do this already" checks are made to be specific to the two dependent instances.
- Caught that RelationProperty.merge() was writing to instance.__dict__ directly (!) - repaired to talk to instance_state.dict.
- Removed needless eager loading example from concrete mapper docs.
- Added test for [ticket:965].
- Added the usual Node class/nodes table to orm/_fixtures.py, but haven't used it for anything yet. We can potentially update test/orm/query.py to use this fixture.
- Other test/documentation cleanup.
- orm
- Modernized the "no mapped table" exception and added a more
explicit __table__/__tablename__ exception to declarative.
+
+ - Concrete inheriting mappers now instrument attributes which
+ are inherited from the superclass, but are not defined for
+ the concrete mapper itself, with an InstrumentedAttribute that
+ issues a descriptive error when accessed. [ticket:1237]
+
+ - Added a new `relation()` keyword `back_populates`. This
+ allows configuation of backreferences using explicit
+ relations. [ticket:781] This is required when creating
+ bidirectional relations between a hierarchy of concrete
+ mappers and another class. [ticket:1237]
+
+ - Test coverage added for `relation()` objects specified on
+ concrete mappers. [ticket:1237]
+
+ - A short documentation example added for bidirectional
+ relations specified on concrete mappers. [ticket:1237]
+
+ - Mappers now instrument class attributes upon construction
+ with the final InstrumentedAttribute object which remains
+ persistent. The `_CompileOnAttr`/`__getattribute__()`
+ methodology has been removed. The net effect is that
+ Column-based mapped class attributes can now be used fully
+ at the class level without invoking a mapper compilation
+ operation, greatly simplifying typical usage patterns
+ within declarative. [ticket:1269]
+
+- schema
+ - Index now accepts column-oriented InstrumentedAttributes
+ (i.e. column-based mapped class attributes) as column
+ arguments. [ticket:1214]
- mysql
- Added the missing keywords from MySQL 4.1 so they get escaped
query.options(defer('summary')).all()
query.options(undefer('excerpt')).all()
-And an entire "deferred group", i.e. which uses the ``group`` keyword argument to :func:`deferred()`, can be undeferred using :func:`undefer_group()`, sending in the group name::
+And an entire "deferred group", i.e. which uses the ``group`` keyword argument to :func:`~sqlalchemy.orm.deferred()`, can be undeferred using :func:`~sqlalchemy.orm.undefer_group()`, sending in the group name::
query = session.query(Book)
query.options(undefer_group('photos')).all()
SQL Expressions as Mapped Attributes
-------------------------------------
-To add a SQL clause composed of local or external columns as a read-only, mapped column attribute, use the :func:`column_property()` function. Any scalar-returning ``ClauseElement`` may be used, as long as it has a ``name`` attribute; usually, you'll want to call ``label()`` to give it a specific name::
+To add a SQL clause composed of local or external columns as a read-only, mapped column attribute, use the :func:`~sqlalchemy.orm.column_property()` function. Any scalar-returning ``ClauseElement`` may be used, as long as it has a ``name`` attribute; usually, you'll want to call ``label()`` to give it a specific name::
mapper(User, users_table, properties={
'fullname': column_property(
Using Relations with Inheritance
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-Both joined-table and single table inheritance scenarios produce mappings which are usable in relation() functions; that is, it's possible to map a parent object to a child object which is polymorphic. Similarly, inheriting mappers can have ``relation()`` objects of their own at any level, which are inherited to each child class. The only requirement for relations is that there is a table relationship between parent and child. An example is the following modification to the joined table inheritance example, which sets a bi-directional relationship between ``Employee`` and ``Company``:
+Both joined-table and single table inheritance scenarios produce mappings which are usable in :func:`~sqlalchemy.orm.relation` functions; that is, it's possible to map a parent object to a child object which is polymorphic. Similarly, inheriting mappers can have :func:`~sqlalchemy.orm.relation` objects of their own at any level, which are inherited to each child class. The only requirement for relations is that there is a table relationship between parent and child. An example is the following modification to the joined table inheritance example, which sets a bi-directional relationship between ``Employee`` and ``Company``:
.. sourcecode:: python+sql
'employees': relation(Employee, backref='company')
})
-SQLAlchemy has a lot of experience in this area; the optimized "outer join" approach can be used freely for parent and child relationships, eager loads are fully useable, query aliasing and other tricks are fully supported as well.
+SQLAlchemy has a lot of experience in this area; the optimized "outer join" approach can be used freely for parent and child relationships, eager loads are fully useable, :func:`~sqlalchemy.orm.aliased` objects and other techniques are fully supported as well.
In a concrete inheritance scenario, mapping relations is more difficult since the distinct classes do not share a table. In this case, you *can* establish a relationship from parent to child if a join condition can be constructed from parent to child, if each child table contains a foreign key to the parent:
'employees': relation(Employee)
})
-Let's crank it up and try loading with an eager load:
+The big limitation with concrete table inheritance is that :func:`~sqlalchemy.orm.relation` objects placed on each concrete mapper do **not** propagate to child mappers. If you want to have the same :func:`~sqlalchemy.orm.relation` objects set up on all concrete mappers, they must be configured manually on each. To configure back references in such a configuration the ``back_populates`` keyword may be used instead of ``backref``, such as below where both ``A(object)`` and ``B(A)`` bidirectionally reference ``C``::
-.. sourcecode:: python+sql
-
- session.query(Company).options(eagerload('employees')).all()
- {opensql}
- SELECT anon_1.type AS anon_1_type, anon_1.manager_data AS anon_1_manager_data, anon_1.engineer_info AS anon_1_engineer_info,
- anon_1.employee_id AS anon_1_employee_id, anon_1.name AS anon_1_name, anon_1.company_id AS anon_1_company_id,
- companies.id AS companies_id, companies.name AS companies_name
- FROM companies LEFT OUTER JOIN (SELECT CAST(NULL AS VARCHAR(50)) AS engineer_info, employees.employee_id AS employee_id,
- CAST(NULL AS VARCHAR(50)) AS manager_data, employees.name AS name, employees.company_id AS company_id, 'employee' AS type
- FROM employees UNION ALL SELECT CAST(NULL AS VARCHAR(50)) AS engineer_info, managers.employee_id AS employee_id,
- managers.manager_data AS manager_data, managers.name AS name, managers.company_id AS company_id, 'manager' AS type
- FROM managers UNION ALL SELECT engineers.engineer_info AS engineer_info, engineers.employee_id AS employee_id,
- CAST(NULL AS VARCHAR(50)) AS manager_data, engineers.name AS name, engineers.company_id AS company_id, 'engineer' AS type
- FROM engineers) AS anon_1 ON companies.id = anon_1.company_id
- []
-
-The big limitation with concrete table inheritance is that relation()s placed on each concrete mapper do **not** propagate to child mappers. If you want to have the same relation()s set up on all concrete mappers, they must be configured manually on each.
+ ajoin = polymorphic_union({
+ 'a':a_table,
+ 'b':b_table
+ }, 'type', 'ajoin')
+
+ mapper(A, a_table, with_polymorphic=('*', ajoin),
+ polymorphic_on=ajoin.c.type, polymorphic_identity='a',
+ properties={
+ 'some_c':relation(C, back_populates='many_a')
+ })
+ mapper(B, b_table,inherits=A, concrete=True,
+ polymorphic_identity='b',
+ properties={
+ 'some_c':relation(C, back_populates='many_a')
+ })
+ mapper(C, c_table, properties={
+ 'many_a':relation(A, collection_class=set, back_populates='some_c'),
+ })
+
Mapping a Class against Multiple Tables
----------------------------------------
-
Mappers can be constructed against arbitrary relational units (called ``Selectables``) as well as plain ``Tables``. For example, The ``join`` keyword from the SQL package creates a neat selectable unit comprised of multiple tables, complete with its own composite primary key, which can be passed in to a mapper as the table.
.. sourcecode:: python+sql
Routing Explicit Joins/Statements into Eagerly Loaded Collections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The behavior of :func:`eagerload()` is such that joins are created automatically, the results of which are routed into collections and scalar references on loaded objects. It is often the case that a query already includes the necessary joins which represent a particular collection or scalar reference, and the joins added by the eagerload feature are redundant - yet you'd still like the collections/references to be populated.
+The behavior of :func:`~sqlalchemy.orm.eagerload()` is such that joins are created automatically, the results of which are routed into collections and scalar references on loaded objects. It is often the case that a query already includes the necessary joins which represent a particular collection or scalar reference, and the joins added by the eagerload feature are redundant - yet you'd still like the collections/references to be populated.
-For this SQLAlchemy supplies the :func:`contains_eager()` option. This option is used in the same manner as the :func:`eagerload()` option except it is assumed that the ``Query`` will specify the appropriate joins explicitly. Below it's used with a ``from_statement`` load::
+For this SQLAlchemy supplies the :func:`~sqlalchemy.orm.contains_eager()` option. This option is used in the same manner as the :func:`~sqlalchemy.orm.eagerload()` option except it is assumed that the ``Query`` will specify the appropriate joins explicitly. Below it's used with a ``from_statement`` load::
# mapping is the users->addresses mapping
mapper(User, users_table, properties={
cls._table.update(whereclause, values).execute(**kwargs)
def relate(cls, propname, *args, **kwargs):
- class_mapper(cls)._compile_property(propname, relation(*args, **kwargs))
+ class_mapper(cls)._configure_property(propname, relation(*args, **kwargs))
def _is_outer_join(selectable):
if not isinstance(selectable, sql.Join):
keyword argument.
:param backref:
- indicates the name of a property to be placed on the related
+ indicates the string name of a property to be placed on the related
mapper's class that will handle this relationship in the other
- direction, including synchronizing the object attributes on both
- sides of the relation. Can also point to a :func:`backref` for
- more configurability.
+ direction. The other property will be created automatically
+ when the mappers are configured. Can also be passed as a
+ :func:`backref` object to control the configuration of the
+ new relation.
+
+ :param back_populates:
+ Takes a string name and has the same meaning as ``backref``,
+ except the complementing property is **not** created automatically,
+ and instead must be configured explicitly on the other mapper. The
+ complementing property should also indicate ``back_populates``
+ to this relation to ensure proper functioning.
:param cascade:
a comma-separated list of cascade rules which determines how
class QueryableAttribute(interfaces.PropComparator):
- def __init__(self, impl, comparator=None, parententity=None):
+ def __init__(self, key, impl=None, comparator=None, parententity=None):
"""Construct an InstrumentedAttribute.
comparator
self.comparator = comparator
self.parententity = parententity
- if parententity:
- mapper, selectable, is_aliased_class = _entity_info(parententity, compile=False)
- self.property = mapper._get_property(self.impl.key)
- else:
- self.property = None
-
def get_history(self, instance, **kwargs):
return self.impl.get_history(instance_state(instance), **kwargs)
def __str__(self):
return repr(self.parententity) + "." + self.property.key
+ @property
+ def property(self):
+ return self.comparator.property
+
+
class InstrumentedAttribute(QueryableAttribute):
"""Public-facing descriptor, placed in the mapped class dictionary."""
"""internal implementation for instrumented attributes."""
def __init__(self, class_, key,
- callable_, class_manager, trackparent=False, extension=None,
- compare_function=None, active_history=False, **kwargs):
+ callable_, trackparent=False, extension=None,
+ compare_function=None, active_history=False, parent_token=None, **kwargs):
"""Construct an AttributeImpl.
\class_
- the class to be instrumented.
-
+ associated class
+
key
string name of the attribute
even if it means executing a lazy callable upon attribute change.
This flag is set to True if any extensions are present.
+ parent_token
+ Usually references the MapperProperty, used as a key for
+ the hasparent() function to identify an "owning" attribute.
+ Allows multiple AttributeImpls to all match a single
+ owner attribute.
+
"""
-
self.class_ = class_
self.key = key
self.callable_ = callable_
- self.class_manager = class_manager
self.trackparent = trackparent
+ self.parent_token = parent_token or self
if compare_function is None:
self.is_equal = operator.eq
else:
will also not have a `hasparent` flag.
"""
- return state.parents.get(id(self), optimistic)
+ return state.parents.get(id(self.parent_token), optimistic)
def sethasparent(self, state, value):
"""Set a boolean flag on the given item corresponding to
attribute represented by this ``InstrumentedAttribute``.
"""
- state.parents[id(self)] = value
+ state.parents[id(self.parent_token)] = value
def set_callable(self, state, callable_):
"""Set a callable function for this attribute on the given object.
class_manager, copy_function=None,
compare_function=None, **kwargs):
super(ScalarAttributeImpl, self).__init__(class_, key, callable_,
- class_manager, compare_function=compare_function, **kwargs)
+ compare_function=compare_function, **kwargs)
class_manager.mutable_attributes.add(key)
if copy_function is None:
raise sa_exc.ArgumentError("MutableScalarAttributeImpl requires a copy function")
accepts_scalar_loader = False
uses_objects = True
- def __init__(self, class_, key, callable_, class_manager,
+ def __init__(self, class_, key, callable_,
trackparent=False, extension=None, copy_function=None,
compare_function=None, **kwargs):
super(ScalarObjectAttributeImpl, self).__init__(class_, key,
- callable_, class_manager, trackparent=trackparent, extension=extension,
+ callable_, trackparent=trackparent, extension=extension,
compare_function=compare_function, **kwargs)
if compare_function is None:
self.is_equal = identity_equal
accepts_scalar_loader = False
uses_objects = True
- def __init__(self, class_, key, callable_, class_manager,
+ def __init__(self, class_, key, callable_,
typecallable=None, trackparent=False, extension=None,
copy_function=None, compare_function=None, **kwargs):
- super(CollectionAttributeImpl, self).__init__(class_,
- key, callable_, class_manager, trackparent=trackparent,
+ super(CollectionAttributeImpl, self).__init__(class_, key, callable_, trackparent=trackparent,
extension=extension, compare_function=compare_function, **kwargs)
if copy_function is None:
manager.instantiable = False
manager.unregister()
-def register_attribute(class_, key, uselist, useobject,
- callable_=None, proxy_property=None,
- mutable_scalars=False, impl_class=None, **kwargs):
- manager = manager_of_class(class_)
- if manager.is_instrumented(key):
- return
+def register_attribute(class_, key, **kw):
+ proxy_property = kw.pop('proxy_property', None)
+
+ comparator = kw.pop('comparator', None)
+ parententity = kw.pop('parententity', None)
+ register_descriptor(class_, key, proxy_property, comparator, parententity)
+ if not proxy_property:
+ register_attribute_impl(class_, key, **kw)
+
+def register_attribute_impl(class_, key, **kw):
+
+ manager = manager_of_class(class_)
+ uselist = kw.get('uselist', False)
if uselist:
- factory = kwargs.pop('typecallable', None)
+ factory = kw.pop('typecallable', None)
typecallable = manager.instrument_collection_class(
key, factory or list)
else:
- typecallable = kwargs.pop('typecallable', None)
+ typecallable = kw.pop('typecallable', None)
+
+ manager[key].impl = _create_prop(class_, key, manager, typecallable=typecallable, **kw)
- comparator = kwargs.pop('comparator', None)
- parententity = kwargs.pop('parententity', None)
+def register_descriptor(class_, key, proxy_property=None, comparator=None, parententity=None, property_=None):
+ manager = manager_of_class(class_)
if proxy_property:
proxy_type = proxied_attribute_factory(proxy_property)
descriptor = proxy_type(key, proxy_property, comparator, parententity)
else:
- descriptor = InstrumentedAttribute(
- _create_prop(class_, key, uselist, callable_,
- class_manager=manager,
- useobject=useobject,
- typecallable=typecallable,
- mutable_scalars=mutable_scalars,
- impl_class=impl_class,
- **kwargs),
- comparator=comparator, parententity=parententity)
+ descriptor = InstrumentedAttribute(key, comparator=comparator, parententity=parententity)
manager.instrument_attribute(key, descriptor)
factories.discard(None)
return factories
-def _create_prop(class_, key, uselist, callable_, class_manager, typecallable, useobject, mutable_scalars, impl_class, **kwargs):
+def _create_prop(class_, key, class_manager,
+ uselist=False, callable_=None, typecallable=None,
+ useobject=False, mutable_scalars=False,
+ impl_class=None, **kwargs):
if impl_class:
- return impl_class(class_, key, typecallable, class_manager=class_manager, **kwargs)
+ return impl_class(class_, key, typecallable, **kwargs)
elif uselist:
return CollectionAttributeImpl(class_, key, callable_,
typecallable=typecallable,
- class_manager=class_manager, **kwargs)
+ **kwargs)
elif useobject:
return ScalarObjectAttributeImpl(class_, key, callable_,
- class_manager=class_manager, **kwargs)
+ **kwargs)
elif mutable_scalars:
return MutableScalarAttributeImpl(class_, key, callable_,
class_manager=class_manager, **kwargs)
else:
- return ScalarAttributeImpl(class_, key, callable_,
- class_manager=class_manager, **kwargs)
+ return ScalarAttributeImpl(class_, key, callable_, **kwargs)
def _generate_init(class_, class_manager):
"""Build an __init__ decorator that triggers ClassManager events."""
self.parent = prop.parent
self.secondary = prop.secondary
self.direction = prop.direction
- self.is_backref = prop._is_backref
self.post_update = prop.post_update
self.passive_deletes = prop.passive_deletes
self.passive_updates = prop.passive_updates
self.key = prop.key
self.dependency_marker = MapperStub(self.parent, self.mapper, self.key)
if not self.prop.synchronize_pairs:
- raise sa_exc.ArgumentError("Can't build a DependencyProcessor for relation %s. No target attributes to populate between parent and child are present" % self.prop)
+ raise sa_exc.ArgumentError("Can't build a DependencyProcessor for relation %s. "
+ "No target attributes to populate between parent and child are present" % self.prop)
def _get_instrumented_attribute(self):
"""Return the ``InstrumentedAttribute`` handled by this
``DependencyProecssor``.
+
"""
-
return self.parent.class_manager.get_impl(self.key)
def hasparent(self, state):
"""return True if the given object instance has a parent,
- according to the ``InstrumentedAttribute`` handled by this ``DependencyProcessor``."""
-
+ according to the ``InstrumentedAttribute`` handled by this ``DependencyProcessor``.
+
+ """
# TODO: use correct API for this
return self._get_instrumented_attribute().hasparent(state)
"""Given an object pair assuming `obj2` is a child of `obj1`,
return a tuple with the dependent object second, or None if
there is no dependency.
- """
+ """
if state1 is state2:
return None
elif self.direction == ONETOMANY:
It is called within the context of the various mappers and
sometimes individual objects sorted according to their
insert/update/delete order (topological sort).
- """
+ """
raise NotImplementedError()
def preprocess_dependencies(self, task, deplist, uowcommit, delete = False):
through related objects and ensure every instance which will
require save/update/delete is properly added to the
UOWTransaction.
- """
+ """
raise NotImplementedError()
def _verify_canload(self, state):
if state is not None and not self.mapper._canload(state, allow_subtypes=not self.enable_typechecks):
if self.mapper._canload(state, allow_subtypes=True):
- raise exc.FlushError("Attempting to flush an item of type %s on collection '%s', which is not the expected type %s. Configure mapper '%s' to load this subtype polymorphically, or set enable_typechecks=False to allow subtypes. Mismatched typeloading may cause bi-directional relationships (backrefs) to not function properly." % (state.class_, self.prop, self.mapper.class_, self.mapper))
+ raise exc.FlushError("Attempting to flush an item of type %s on collection '%s', "
+ "which is not the expected type %s. Configure mapper '%s' to load this "
+ "subtype polymorphically, or set enable_typechecks=False to allow subtypes. "
+ "Mismatched typeloading may cause bi-directional relationships (backrefs) "
+ "to not function properly." % (state.class_, self.prop, self.mapper.class_, self.mapper))
else:
- raise exc.FlushError("Attempting to flush an item of type %s on collection '%s', whose mapper does not inherit from that of %s." % (state.class_, self.prop, self.mapper.class_))
+ raise exc.FlushError("Attempting to flush an item of type %s on collection '%s', "
+ "whose mapper does not inherit from that of %s." % (state.class_, self.prop, self.mapper.class_))
def _synchronize(self, state, child, associationrow, clearkeys, uowcommit):
"""Called during a flush to synchronize primary key identifier
values between a parent/child object, as well as to an
associationrow in the case of many-to-many.
+
"""
-
raise NotImplementedError()
-
+ def _check_reverse_action(self, uowcommit, parent, child, action):
+ """Determine if an action has been performed by the 'reverse' property of this property.
+
+ this is used to ensure that only one side of a bidirectional relation
+ issues a certain operation for a parent/child pair.
+
+ """
+ for r in self.prop._reverse_property:
+ if (r._dependency_processor, action, parent, child) in uowcommit.attributes:
+ return True
+ return False
+
+ def _performed_action(self, uowcommit, parent, child, action):
+ """Establish that an action has been performed for a certain parent/child pair.
+
+ Used only for actions that are sensitive to bidirectional double-action,
+ i.e. manytomany, post_update.
+
+ """
+ uowcommit.attributes[(self, action, parent, child)] = True
+
def _conditional_post_update(self, state, uowcommit, related):
"""Execute a post_update call.
particular relationship, and given a target object and list of
one or more related objects, and execute the ``UPDATE`` if the
given related object list contains ``INSERT``s or ``DELETE``s.
+
"""
-
if state is not None and self.post_update:
for x in related:
- if x is not None:
+ if x is not None and not self._check_reverse_action(uowcommit, x, state, "postupdate"):
uowcommit.register_object(state, postupdate=True, post_update_cols=[r for l, r in self.prop.synchronize_pairs])
+ self._performed_action(uowcommit, x, state, "postupdate")
break
def _pks_changed(self, uowcommit, state):
raise NotImplementedError()
def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, str(self.prop))
+ return "%s(%s)" % (self.__class__.__name__, self.prop)
class OneToManyDP(DependencyProcessor):
def register_dependencies(self, uowcommit):
if self.post_update:
- if not self.is_backref:
- uowcommit.register_dependency(self.mapper, self.dependency_marker)
- uowcommit.register_dependency(self.parent, self.dependency_marker)
- uowcommit.register_processor(self.dependency_marker, self, self.parent)
+ uowcommit.register_dependency(self.mapper, self.dependency_marker)
+ uowcommit.register_dependency(self.parent, self.dependency_marker)
+ uowcommit.register_processor(self.dependency_marker, self, self.parent)
else:
uowcommit.register_dependency(self.parent, self.mapper)
uowcommit.register_processor(self.parent, self, self.parent)
def process_dependencies(self, task, deplist, uowcommit, delete = False):
- #print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " process_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
if delete:
# head object is being deleted, and we manage its list of child objects
# the child objects have to have their foreign key to the parent set to NULL
self._synchronize(state, child, None, False, uowcommit)
def preprocess_dependencies(self, task, deplist, uowcommit, delete = False):
- #print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " preprocess_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
-
if delete:
# head object is being deleted, and we manage its list of child objects
# the child objects have to have their foreign key to the parent set to NULL
def register_dependencies(self, uowcommit):
if self.post_update:
- if not self.is_backref:
- uowcommit.register_dependency(self.mapper, self.dependency_marker)
- uowcommit.register_dependency(self.parent, self.dependency_marker)
- uowcommit.register_processor(self.dependency_marker, self, self.parent)
+ uowcommit.register_dependency(self.mapper, self.dependency_marker)
+ uowcommit.register_dependency(self.parent, self.dependency_marker)
+ uowcommit.register_processor(self.dependency_marker, self, self.parent)
else:
uowcommit.register_dependency(self.mapper, self.parent)
uowcommit.register_processor(self.mapper, self, self.parent)
def process_dependencies(self, task, deplist, uowcommit, delete=False):
- #print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " process_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
if delete:
if self.post_update and not self.cascade.delete_orphan and not self.passive_deletes == 'all':
# post_update means we have to update our row to not reference the child object
self._conditional_post_update(state, uowcommit, history.sum())
def preprocess_dependencies(self, task, deplist, uowcommit, delete=False):
- #print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " PRE process_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
if self.post_update:
return
if delete:
uowcommit.register_processor(self.dependency_marker, self, self.parent)
def process_dependencies(self, task, deplist, uowcommit, delete = False):
- #print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " process_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
connection = uowcommit.transaction.connection(self.mapper)
secondary_delete = []
secondary_insert = []
secondary_update = []
- if self.prop._reverse_property:
- reverse_dep = getattr(self.prop._reverse_property, '_dependency_processor', None)
- else:
- reverse_dep = None
-
if delete:
for state in deplist:
history = uowcommit.get_attribute_history(state, self.key, passive=self.passive_deletes)
if history:
for child in history.non_added():
- if child is None or (reverse_dep and (reverse_dep, "manytomany", child, state) in uowcommit.attributes):
+ if child is None or self._check_reverse_action(uowcommit, child, state, "manytomany"):
continue
associationrow = {}
self._synchronize(state, child, associationrow, False, uowcommit)
secondary_delete.append(associationrow)
- uowcommit.attributes[(self, "manytomany", state, child)] = True
+ self._performed_action(uowcommit, state, child, "manytomany")
else:
for state in deplist:
history = uowcommit.get_attribute_history(state, self.key)
if history:
for child in history.added:
- if child is None or (reverse_dep and (reverse_dep, "manytomany", child, state) in uowcommit.attributes):
+ if child is None or self._check_reverse_action(uowcommit, child, state, "manytomany"):
continue
associationrow = {}
self._synchronize(state, child, associationrow, False, uowcommit)
- uowcommit.attributes[(self, "manytomany", state, child)] = True
+ self._performed_action(uowcommit, state, child, "manytomany")
secondary_insert.append(associationrow)
for child in history.deleted:
- if child is None or (reverse_dep and (reverse_dep, "manytomany", child, state) in uowcommit.attributes):
+ if child is None or self._check_reverse_action(uowcommit, child, state, "manytomany"):
continue
associationrow = {}
self._synchronize(state, child, associationrow, False, uowcommit)
- uowcommit.attributes[(self, "manytomany", state, child)] = True
+ self._performed_action(uowcommit, state, child, "manytomany")
secondary_delete.append(associationrow)
if not self.passive_updates and self._pks_changed(uowcommit, state):
secondary_update.append(associationrow)
if secondary_delete:
- # TODO: precompile the delete/insert queries?
- statement = self.secondary.delete(sql.and_(*[c == sql.bindparam(c.key, type_=c.type) for c in self.secondary.c if c.key in associationrow]))
+ statement = self.secondary.delete(sql.and_(*[
+ c == sql.bindparam(c.key, type_=c.type) for c in self.secondary.c if c.key in associationrow
+ ]))
result = connection.execute(statement, secondary_delete)
if result.supports_sane_multi_rowcount() and result.rowcount != len(secondary_delete):
- raise exc.ConcurrentModificationError("Deleted rowcount %d does not match number of secondary table rows deleted from table '%s': %d" % (result.rowcount, self.secondary.description, len(secondary_delete)))
+ raise exc.ConcurrentModificationError("Deleted rowcount %d does not match number of "
+ "secondary table rows deleted from table '%s': %d" %
+ (result.rowcount, self.secondary.description, len(secondary_delete)))
if secondary_update:
- statement = self.secondary.update(sql.and_(*[c == sql.bindparam("old_" + c.key, type_=c.type) for c in self.secondary.c if c.key in associationrow]))
+ statement = self.secondary.update(sql.and_(*[
+ c == sql.bindparam("old_" + c.key, type_=c.type) for c in self.secondary.c if c.key in associationrow
+ ]))
result = connection.execute(statement, secondary_update)
if result.supports_sane_multi_rowcount() and result.rowcount != len(secondary_update):
- raise exc.ConcurrentModificationError("Updated rowcount %d does not match number of secondary table rows updated from table '%s': %d" % (result.rowcount, self.secondary.description, len(secondary_update)))
+ raise exc.ConcurrentModificationError("Updated rowcount %d does not match number of "
+ "secondary table rows updated from table '%s': %d" %
+ (result.rowcount, self.secondary.description, len(secondary_update)))
if secondary_insert:
statement = self.secondary.insert()
connection.execute(statement, secondary_insert)
def preprocess_dependencies(self, task, deplist, uowcommit, delete = False):
- #print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " preprocess_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
if not delete:
for state in deplist:
history = uowcommit.get_attribute_history(state, self.key, passive=True)
class DynaLoader(strategies.AbstractRelationLoader):
def init_class_attribute(self):
self.is_class_level = True
- self._register_attribute(self.parent.class_, impl_class=DynamicAttributeImpl, target_mapper=self.parent_property.mapper, order_by=self.parent_property.order_by, query_class=self.parent_property.query_class)
+
+ strategies._register_attribute(self,
+ useobject=True,
+ impl_class=DynamicAttributeImpl,
+ target_mapper=self.parent_property.mapper,
+ order_by=self.parent_property.order_by,
+ query_class=self.parent_property.query_class
+ )
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
return (None, None)
uses_objects = True
accepts_scalar_loader = False
- def __init__(self, class_, key, typecallable, class_manager,
- target_mapper, order_by, query_class=None, **kwargs):
- super(DynamicAttributeImpl, self).__init__(
- class_, key, typecallable, class_manager, **kwargs)
+ def __init__(self, class_, key, typecallable,
+ target_mapper, order_by, query_class=None, **kwargs):
+ super(DynamicAttributeImpl, self).__init__(class_, key, typecallable, **kwargs)
self.target_mapper = target_mapper
self.order_by = order_by
if not query_class:
def set_parent(self, parent):
self.parent = parent
- def init(self, key, parent):
+ def instrument_class(self, mapper):
+ raise NotImplementedError()
+
+ def init(self):
"""Called after all mappers are compiled to assemble
relationships between mappers, establish instrumented class
attributes.
"""
- self.key = key
self._compiled = True
self.do_init()
ColumnProperty = None
SynonymProperty = None
ComparableProperty = None
+RelationProperty = None
+ConcreteInheritedProperty = None
_expire_state = None
_state_session = None
self.order_by = util.to_list(order_by)
else:
self.order_by = order_by
-
+
self.always_refresh = always_refresh
self.version_id_col = version_id_col
self.concrete = concrete
# load custom properties
if self._init_properties:
for key, prop in self._init_properties.iteritems():
- self._compile_property(key, prop, False)
+ self._configure_property(key, prop, False)
# pull properties from the inherited mapper if any.
if self.inherits:
for key, prop in self.inherits._props.iteritems():
if key not in self._props and not self._should_exclude(key, local=False):
- self._adapt_inherited_property(key, prop)
+ self._adapt_inherited_property(key, prop, False)
# create properties for each column in the mapped table,
# for those columns which don't already map to a property
if column in mapper._columntoproperty:
column_key = mapper._columntoproperty[column].key
- self._compile_property(column_key, column, init=False, setparent=True)
+ self._configure_property(column_key, column, init=False, setparent=True)
# do a special check for the "discriminiator" column, as it may only be present
# in the 'with_polymorphic' selectable but we need it for the base mapper
if self.polymorphic_on and self.polymorphic_on not in self._columntoproperty:
- col = self.mapped_table.corresponding_column(self.polymorphic_on) or self.polymorphic_on
+ col = self.mapped_table.corresponding_column(self.polymorphic_on)
+ if not col:
+ dont_instrument = True
+ col = self.polymorphic_on
+ else:
+ dont_instrument = False
if self._should_exclude(col.key, local=False):
raise sa_exc.InvalidRequestError("Cannot exclude or override the discriminator column %r" % col.key)
- self._compile_property(col.key, ColumnProperty(col), init=False, setparent=True)
+ self._configure_property(col.key, ColumnProperty(col, _no_instrument=dont_instrument), init=False, setparent=True)
- def _adapt_inherited_property(self, key, prop):
+ def _adapt_inherited_property(self, key, prop, init):
if not self.concrete:
- self._compile_property(key, prop, init=False, setparent=False)
- # TODO: concrete properties dont adapt at all right now....will require copies of relations() etc.
-
- class _CompileOnAttr(PropComparator):
- """A placeholder descriptor which triggers compilation on access."""
-
- def __init__(self, class_, key):
- self.class_ = class_
- self.key = key
- self.existing_prop = getattr(class_, key, None)
-
- def __getattribute__(self, key):
- cls = object.__getattribute__(self, 'class_')
- clskey = object.__getattribute__(self, 'key')
-
- # ugly hack
- if key.startswith('__') and key != '__clause_element__':
- return object.__getattribute__(self, key)
-
- class_mapper(cls)
-
- if cls.__dict__.get(clskey) is self:
- # if this warning occurs, it usually means mapper
- # compilation has failed, but operations upon the mapped
- # classes have proceeded.
- util.warn(
- ("Attribute '%s' on class '%s' was not replaced during "
- "mapper compilation operation") % (clskey, cls.__name__))
- # clean us up explicitly
- delattr(cls, clskey)
-
- return getattr(getattr(cls, clskey), key)
-
- def _compile_property(self, key, prop, init=True, setparent=True):
- self._log("_compile_property(%s, %s)" % (key, prop.__class__.__name__))
+ self._configure_property(key, prop, init=False, setparent=False)
+ elif key not in self._props:
+ self._configure_property(key, ConcreteInheritedProperty(), init=init, setparent=True)
+
+ def _configure_property(self, key, prop, init=True, setparent=True):
+ self._log("_configure_property(%s, %s)" % (key, prop.__class__.__name__))
if not isinstance(prop, MapperProperty):
# we were passed a Column or a list of Columns; generate a ColumnProperty
prop = prop.copy()
prop.columns.append(column)
self._log("appending to existing ColumnProperty %s" % (key))
- elif prop is None:
+ elif prop is None or isinstance(prop, ConcreteInheritedProperty):
mapped_column = []
for c in columns:
mc = self.mapped_table.corresponding_column(c)
elif isinstance(prop, (ComparableProperty, SynonymProperty)) and setparent:
if prop.descriptor is None:
desc = getattr(self.class_, key, None)
- if isinstance(desc, Mapper._CompileOnAttr):
- desc = object.__getattribute__(desc, 'existing_prop')
if self._is_userland_descriptor(desc):
prop.descriptor = desc
if getattr(prop, 'map_column', False):
raise sa_exc.ArgumentError(
"Can't compile synonym '%s': no column on table '%s' named '%s'"
% (prop.name, self.mapped_table.description, key))
- self._compile_property(prop.name, ColumnProperty(self.mapped_table.c[key]), init=init, setparent=setparent)
+ self._configure_property(prop.name, ColumnProperty(self.mapped_table.c[key]), init=init, setparent=setparent)
self._props[key] = prop
prop.key = key
if setparent:
prop.set_parent(self)
- if not self.non_primary:
- self.class_manager.install_descriptor(
- key, Mapper._CompileOnAttr(self.class_, key))
+ if not self.non_primary:
+ prop.instrument_class(self)
+
+ for mapper in self._inheriting_mappers:
+ mapper._adapt_inherited_property(key, prop, init)
if init:
- prop.init(key, self)
+ prop.init()
- for mapper in self._inheriting_mappers:
- mapper._adapt_inherited_property(key, prop)
def compile(self):
"""Compile this mapper and all other non-compiled mappers.
for key, prop in l:
if not getattr(prop, '_compiled', False):
self._log("initialize prop " + key)
- prop.init(key, self)
+ prop.init()
self._log("_post_configure_properties() complete")
self.compiled = True
"""
self._init_properties[key] = prop
- self._compile_property(key, prop, init=self.compiled)
+ self._configure_property(key, prop, init=self.compiled)
# class formatting / logging.
construct an outerjoin amongst those mapper's mapped tables.
"""
+
from_obj = self.mapped_table
for m in mappers:
if m is self:
self.columns = [expression._labeled(c) for c in columns]
self.group = kwargs.pop('group', None)
self.deferred = kwargs.pop('deferred', False)
+ self.no_instrument = kwargs.pop('_no_instrument', False)
self.comparator_factory = kwargs.pop('comparator_factory', self.__class__.Comparator)
self.descriptor = kwargs.pop('descriptor', None)
self.extension = kwargs.pop('extension', None)
util.set_creation_order(self)
- if self.deferred:
+ if self.no_instrument:
+ self.strategy_class = strategies.UninstrumentedColumnLoader
+ elif self.deferred:
self.strategy_class = strategies.DeferredColumnLoader
else:
self.strategy_class = strategies.ColumnLoader
-
+
+ def instrument_class(self, mapper):
+ if self.no_instrument:
+ return
+
+ attributes.register_descriptor(
+ mapper.class_,
+ self.key,
+ comparator=self.comparator_factory(self, mapper),
+ parententity=mapper,
+ property_=self
+ )
+
def do_init(self):
super(ColumnProperty, self).do_init()
if len(self.columns) > 1 and self.parent.primary_key.issuperset(self.columns):
("On mapper %s, primary key column '%s' is being combined "
"with distinct primary key column '%s' in attribute '%s'. "
"Use explicit properties to give each column its own mapped "
- "attribute name.") % (str(self.parent), str(self.columns[1]),
- str(self.columns[0]), self.key))
+ "attribute name.") % (self.parent, self.columns[1],
+ self.columns[0], self.key))
def copy(self):
return ColumnProperty(deferred=self.deferred, group=self.group, *self.columns)
col = self.__clause_element__()
return op(col._bind_param(other), col, **kwargs)
+ # TODO: legacy..do we need this ? (0.5)
ColumnComparator = Comparator
def __str__(self):
self.composite_class = class_
self.strategy_class = strategies.CompositeColumnLoader
- def do_init(self):
- super(ColumnProperty, self).do_init()
- # TODO: similar PK check as ColumnProperty does ?
-
def copy(self):
return CompositeProperty(deferred=self.deferred, group=self.group, composite_class=self.composite_class, *self.columns)
+ def do_init(self):
+ # skip over ColumnProperty's do_init(),
+ # which issues assertions that do not apply to CompositeColumnProperty
+ super(ColumnProperty, self).do_init()
+
def getattr(self, state, column):
obj = state.get_impl(self.key).get(state)
return self.get_col_value(column, obj)
def __str__(self):
return str(self.parent.class_.__name__) + "." + self.key
+class ConcreteInheritedProperty(MapperProperty):
+ extension = None
+
+ def setup(self, context, entity, path, adapter, **kwargs):
+ pass
+
+ def create_row_processor(self, selectcontext, path, mapper, row, adapter):
+ return (None, None)
+
+ def instrument_class(self, mapper):
+ def warn():
+ raise AttributeError("Concrete %s does not implement attribute %r at "
+ "the instance level. Add this property explicitly to %s." %
+ (self.parent, self.key, self.parent))
+
+ class NoninheritedConcreteProp(object):
+ def __set__(s, obj, value):
+ warn()
+ def __delete__(s, obj):
+ warn()
+ def __get__(s, obj, owner):
+ warn()
+
+ comparator_callable = None
+ # TODO: put this process into a deferred callable?
+ for m in self.parent.iterate_to_root():
+ p = m._get_property(self.key)
+ if not isinstance(p, ConcreteInheritedProperty):
+ comparator_callable = p.comparator_factory
+ break
+
+ attributes.register_descriptor(
+ mapper.class_,
+ self.key,
+ comparator=comparator_callable(self, mapper),
+ parententity=mapper,
+ property_=self,
+ proxy_property=NoninheritedConcreteProp()
+ )
+
+
class SynonymProperty(MapperProperty):
extension = None
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
return (None, None)
- def do_init(self):
+ def instrument_class(self, mapper):
class_ = self.parent.class_
- self.logger.info("register managed attribute %s on class %s" % (self.key, class_.__name__))
if self.descriptor is None:
class SynonymProp(object):
def __set__(s, obj, value):
return prop.comparator_factory(prop, mapper)
return comparator
- strategies.DefaultColumnLoader(self)._register_attribute(
- None, None, False, comparator_callable, proxy_property=self.descriptor)
+ attributes.register_descriptor(
+ mapper.class_,
+ self.key,
+ comparator=comparator_callable(self, mapper),
+ parententity=mapper,
+ property_=self,
+ proxy_property=self.descriptor
+ )
def merge(self, session, source, dest, dont_load, _recursive):
pass
self.comparator_factory = comparator_factory
util.set_creation_order(self)
- def do_init(self):
+ def instrument_class(self, mapper):
"""Set up a proxy to the unmanaged descriptor."""
- strategies.DefaultColumnLoader(self)._register_attribute(None, None, False, self.comparator_factory, proxy_property=self.descriptor)
+ attributes.register_descriptor(
+ mapper.class_,
+ self.key,
+ comparator=self.comparator_factory(self, mapper),
+ parententity=mapper,
+ property_=self,
+ proxy_property=self.descriptor
+ )
def setup(self, context, entity, path, adapter, **kwargs):
pass
"""
def __init__(self, argument,
- secondary=None, primaryjoin=None, secondaryjoin=None,
- foreign_keys=None, uselist=None, order_by=False, backref=None,
- _is_backref=False, post_update=False, cascade=False,
- extension=None, viewonly=False, lazy=True,
- collection_class=None, passive_deletes=False,
- passive_updates=True, remote_side=None,
- enable_typechecks=True, join_depth=None,
- comparator_factory=None, strategy_class=None,
- _local_remote_pairs=None, query_class=None):
+ secondary=None, primaryjoin=None,
+ secondaryjoin=None,
+ foreign_keys=None,
+ uselist=None,
+ order_by=False,
+ backref=None,
+ back_populates=None,
+ post_update=False,
+ cascade=False, extension=None,
+ viewonly=False, lazy=True,
+ collection_class=None, passive_deletes=False,
+ passive_updates=True, remote_side=None,
+ enable_typechecks=True, join_depth=None,
+ comparator_factory=None,
+ strategy_class=None, _local_remote_pairs=None, query_class=None):
+
self.uselist = uselist
self.argument = argument
self.secondary = secondary
else:
self.strategy_class = strategies.LazyLoader
- self._reverse_property = None
+ self._reverse_property = set()
if cascade is not False:
self.cascade = CascadeOptions(cascade)
self.order_by = order_by
- if isinstance(backref, str):
+ self.back_populates = back_populates
+
+ if self.back_populates:
+ if backref:
+ raise sa_exc.ArgumentError("backref and back_populates keyword arguments are mutually exclusive")
+ self.backref = None
+ elif isinstance(backref, str):
# propagate explicitly sent primary/secondary join conditions to the BackRef object if
# just a string was sent
if secondary is not None:
# reverse primary/secondary in case of a many-to-many
- self.backref = BackRef(backref, primaryjoin=secondaryjoin, secondaryjoin=primaryjoin, passive_updates=self.passive_updates)
+ self.backref = BackRef(backref, primaryjoin=secondaryjoin,
+ secondaryjoin=primaryjoin, passive_updates=self.passive_updates)
else:
- self.backref = BackRef(backref, primaryjoin=primaryjoin, secondaryjoin=secondaryjoin, passive_updates=self.passive_updates)
+ self.backref = BackRef(backref, primaryjoin=primaryjoin,
+ secondaryjoin=secondaryjoin, passive_updates=self.passive_updates)
else:
self.backref = backref
- self._is_backref = _is_backref
+
+ def instrument_class(self, mapper):
+ attributes.register_descriptor(
+ mapper.class_,
+ self.key,
+ comparator=self.comparator_factory(self, mapper),
+ parententity=mapper,
+ property_=self
+ )
class Comparator(PropComparator):
def __init__(self, prop, mapper, of_type=None, adapter=None):
- self.prop = self.property = prop
+ self.prop = prop
self.mapper = mapper
self.adapter = adapter
if of_type:
on the local side of generated expressions.
"""
- return self.__class__(self.prop, self.mapper, getattr(self, '_of_type', None), adapter)
-
+ return self.__class__(self.property, self.mapper, getattr(self, '_of_type', None), adapter)
+
@property
def parententity(self):
- return self.prop.parent
+ return self.property.parent
def __clause_element__(self):
- elem = self.prop.parent._with_polymorphic_selectable
+ elem = self.property.parent._with_polymorphic_selectable
if self.adapter:
return self.adapter(elem)
else:
return op(self, *other, **kwargs)
def of_type(self, cls):
- return RelationProperty.Comparator(self.prop, self.mapper, cls)
+ return RelationProperty.Comparator(self.property, self.mapper, cls)
def in_(self, other):
raise NotImplementedError("in_() not yet supported for relations. For a "
def __eq__(self, other):
if other is None:
- if self.prop.direction in [ONETOMANY, MANYTOMANY]:
+ if self.property.direction in [ONETOMANY, MANYTOMANY]:
return ~self._criterion_exists()
else:
- return self.prop._optimized_compare(None, adapt_source=self.adapter)
- elif self.prop.uselist:
+ return self.property._optimized_compare(None, adapt_source=self.adapter)
+ elif self.property.uselist:
raise sa_exc.InvalidRequestError("Can't compare a collection to an object or collection; use contains() to test for membership.")
else:
- return self.prop._optimized_compare(other, adapt_source=self.adapter)
+ return self.property._optimized_compare(other, adapt_source=self.adapter)
def _criterion_exists(self, criterion=None, **kwargs):
if getattr(self, '_of_type', None):
target_mapper = self._of_type
to_selectable = target_mapper._with_polymorphic_selectable
- if self.prop._is_self_referential():
+ if self.property._is_self_referential():
to_selectable = to_selectable.alias()
single_crit = target_mapper._single_table_criterion
source_selectable = None
pj, sj, source, dest, secondary, target_adapter = \
- self.prop._create_joins(dest_polymorphic=True, dest_selectable=to_selectable, source_selectable=source_selectable)
+ self.property._create_joins(dest_polymorphic=True, dest_selectable=to_selectable, source_selectable=source_selectable)
for k in kwargs:
- crit = self.prop.mapper.class_manager[k] == kwargs[k]
+ crit = self.property.mapper.class_manager[k] == kwargs[k]
if criterion is None:
criterion = crit
else:
if sj:
j = _orm_annotate(pj) & sj
else:
- j = _orm_annotate(pj, exclude=self.prop.remote_side)
+ j = _orm_annotate(pj, exclude=self.property.remote_side)
if criterion and target_adapter:
# limit this adapter to annotated only?
return sql.exists([1], crit, from_obj=dest).correlate(source)
def any(self, criterion=None, **kwargs):
- if not self.prop.uselist:
+ if not self.property.uselist:
raise sa_exc.InvalidRequestError("'any()' not implemented for scalar attributes. Use has().")
return self._criterion_exists(criterion, **kwargs)
def has(self, criterion=None, **kwargs):
- if self.prop.uselist:
+ if self.property.uselist:
raise sa_exc.InvalidRequestError("'has()' not implemented for collections. Use any().")
return self._criterion_exists(criterion, **kwargs)
def contains(self, other, **kwargs):
- if not self.prop.uselist:
+ if not self.property.uselist:
raise sa_exc.InvalidRequestError("'contains' not implemented for scalar attributes. Use ==")
- clause = self.prop._optimized_compare(other, adapt_source=self.adapter)
+ clause = self.property._optimized_compare(other, adapt_source=self.adapter)
- if self.prop.secondaryjoin:
+ if self.property.secondaryjoin:
clause.negation_clause = self.__negated_contains_or_equals(other)
return clause
def __negated_contains_or_equals(self, other):
- if self.prop.direction == MANYTOONE:
+ if self.property.direction == MANYTOONE:
state = attributes.instance_state(other)
- strategy = self.prop._get_strategy(strategies.LazyLoader)
+ strategy = self.property._get_strategy(strategies.LazyLoader)
def state_bindparam(state, col):
o = state.obj() # strong ref
- return lambda: self.prop.mapper._get_committed_attr_by_column(o, col)
+ return lambda: self.property.mapper._get_committed_attr_by_column(o, col)
def adapt(col):
if self.adapter:
sql.or_(
adapt(x) != state_bindparam(state, y),
adapt(x) == None)
- for (x, y) in self.prop.local_remote_pairs])
+ for (x, y) in self.property.local_remote_pairs])
- criterion = sql.and_(*[x==y for (x, y) in zip(self.prop.mapper.primary_key, self.prop.mapper.primary_key_from_instance(other))])
+ criterion = sql.and_(*[x==y for (x, y) in zip(self.property.mapper.primary_key, self.property.mapper.primary_key_from_instance(other))])
return ~self._criterion_exists(criterion)
def __ne__(self, other):
if other is None:
- if self.prop.direction == MANYTOONE:
- return sql.or_(*[x!=None for x in self.prop._foreign_keys])
+ if self.property.direction == MANYTOONE:
+ return sql.or_(*[x!=None for x in self.property._foreign_keys])
else:
return self._criterion_exists()
- elif self.prop.uselist:
+ elif self.property.uselist:
raise sa_exc.InvalidRequestError("Can't compare a collection to an object or collection; use contains() to test for membership.")
else:
return self.__negated_contains_or_equals(other)
+ @util.memoized_property
+ def property(self):
+ self.prop.parent.compile()
+ return self.prop
+
def compare(self, op, value, value_is_parent=False):
if op == operators.eq:
if value is None:
return str(self.parent.class_.__name__) + "." + self.key
def merge(self, session, source, dest, dont_load, _recursive):
- if not dont_load and self._reverse_property and (source, self._reverse_property) in _recursive:
- return
+ if not dont_load:
+ # TODO: no test coverage for recursive check
+ for r in self._reverse_property:
+ if (source, r) in _recursive:
+ return
source_state = attributes.instance_state(source)
dest_state = attributes.instance_state(dest)
obj = session.merge(current, dont_load=dont_load, _recursive=_recursive)
if obj is not None:
if dont_load:
- dest.__dict__[self.key] = obj
+ dest_state.dict[self.key] = obj
else:
setattr(dest, self.key, obj)
for c in instances:
if c is not None and c not in visited_instances and (halt_on is None or not halt_on(c)):
if not isinstance(c, self.mapper.class_):
- raise AssertionError("Attribute '%s' on class '%s' doesn't handle objects of type '%s'" % (self.key, str(self.parent.class_), str(c.__class__)))
+ raise AssertionError("Attribute '%s' on class '%s' doesn't handle objects "
+ "of type '%s'" % (self.key, str(self.parent.class_), str(c.__class__)))
visited_instances.add(c)
# cascade using the mapper local to this object, so that its individual properties are located
instance_mapper = object_mapper(c)
yield (c, instance_mapper, attributes.instance_state(c))
- def _get_target_class(self):
- """Return the target class of the relation, even if the
- property has not been initialized yet.
-
- """
- if isinstance(self.argument, type):
- return self.argument
- else:
- return self.argument.class_
-
+ def _add_reverse_property(self, key):
+ other = self.mapper._get_property(key)
+ self._reverse_property.add(other)
+ other._reverse_property.add(self)
+
+ if not other._get_target().common_parent(self.parent):
+ raise sa_exc.ArgumentError("reverse_property %r on relation %s references "
+ "relation %s, which does not reference mapper %s" % (key, self, other, self.parent))
+
def do_init(self):
- self._determine_targets()
+ self._get_target()
+ self._process_dependent_arguments()
self._determine_joins()
self._determine_synchronize_pairs()
self._determine_direction()
self._determine_local_remote_pairs()
self._post_init()
- def _determine_targets(self):
- if isinstance(self.argument, type):
- self.mapper = mapper.class_mapper(self.argument, compile=False)
- elif isinstance(self.argument, mapper.Mapper):
- self.mapper = self.argument
- elif util.callable(self.argument):
- # accept a callable to suit various deferred-configurational schemes
- self.mapper = mapper.class_mapper(self.argument(), compile=False)
- else:
- raise sa_exc.ArgumentError("relation '%s' expects a class or a mapper argument (received: %s)" % (self.key, type(self.argument)))
- assert isinstance(self.mapper, mapper.Mapper), self.mapper
+ def _get_target(self):
+ if not hasattr(self, 'mapper'):
+ if isinstance(self.argument, type):
+ self.mapper = mapper.class_mapper(self.argument, compile=False)
+ elif isinstance(self.argument, mapper.Mapper):
+ self.mapper = self.argument
+ elif util.callable(self.argument):
+ # accept a callable to suit various deferred-configurational schemes
+ self.mapper = mapper.class_mapper(self.argument(), compile=False)
+ else:
+ raise sa_exc.ArgumentError("relation '%s' expects a class or a mapper argument (received: %s)" % (self.key, type(self.argument)))
+ assert isinstance(self.mapper, mapper.Mapper), self.mapper
+ return self.mapper
+
+ def _process_dependent_arguments(self):
# accept callables for other attributes which may require deferred initialization
for attr in ('order_by', 'primaryjoin', 'secondaryjoin', 'secondary', '_foreign_keys', 'remote_side'):
# primary property handler, set up class attributes
if self.is_primary():
+ if self.back_populates:
+ self.extension = util.to_list(self.extension) or []
+ self.extension.append(attributes.GenericBackrefExtension(self.back_populates))
+ self._add_reverse_property(self.back_populates)
+
if self.backref is not None:
self.backref.compile(self)
elif not mapper.class_mapper(self.parent.class_, compile=False)._get_property(self.key, raiseerr=False):
"a non-primary mapper on class '%s'. New relations can only be "
"added to the primary mapper, i.e. the very first "
"mapper created for class '%s' " % (self.key, self.parent.class_.__name__, self.parent.class_.__name__))
-
+
super(RelationProperty, self).do_init()
def _refers_to_parent_table(self):
class BackRef(object):
"""Attached to a RelationProperty to indicate a complementary reverse relationship.
- Can optionally create the complementing RelationProperty if one does not exist already."""
+ Handles the job of creating the opposite RelationProperty according to configuration.
+
+ Alternatively, two explicit RelationProperty objects can be associated bidirectionally
+ using the back_populates keyword argument on each.
+
+ """
def __init__(self, key, _prop=None, **kwargs):
self.key = key
relation = RelationProperty(parent, prop.secondary, pj, sj,
backref=BackRef(prop.key, _prop=prop),
- _is_backref=True,
**self.kwargs)
- mapper._compile_property(self.key, relation);
+ mapper._configure_property(self.key, relation);
- prop._reverse_property = mapper._get_property(self.key)
- mapper._get_property(self.key)._reverse_property = prop
+ prop._add_reverse_property(self.key)
else:
raise sa_exc.ArgumentError("Error creating backref '%s' on relation '%s': "
mapper.ColumnProperty = ColumnProperty
mapper.SynonymProperty = SynonymProperty
mapper.ComparableProperty = ComparableProperty
+mapper.RelationProperty = RelationProperty
+mapper.ConcreteInheritedProperty = ConcreteInheritedProperty
\ No newline at end of file
if isinstance(column, basestring):
column = sql.literal_column(column)
self._result_label = column.name
- elif isinstance(column, (attributes.QueryableAttribute, mapper.Mapper._CompileOnAttr)):
- self._result_label = column.impl.key
+ elif isinstance(column, attributes.QueryableAttribute):
+ self._result_label = column.property.key
column = column.__clause_element__()
else:
self._result_label = getattr(column, 'key', None)
return util.IdentitySet(self._new.values())
_expire_state = attributes.InstanceState.expire_attributes
-register_attribute = unitofwork.register_attribute
+UOWEventHandler = unitofwork.UOWEventHandler
_sessions = weakref.WeakValueDictionary()
from sqlalchemy.orm import session as sessionlib
from sqlalchemy.orm import util as mapperutil
+def _register_attribute(strategy, useobject,
+ compare_function=None,
+ typecallable=None,
+ copy_function=None,
+ mutable_scalars=False,
+ uselist=False,
+ callable_=None,
+ proxy_property=None,
+ active_history=False,
+ impl_class=None,
+ **kw
+):
+
+ prop = strategy.parent_property
+ attribute_ext = util.to_list(prop.extension) or []
+ if getattr(prop, 'backref', None):
+ attribute_ext.append(prop.backref.extension)
+
+ if prop.key in prop.parent._validators:
+ attribute_ext.append(mapperutil.Validator(prop.key, prop.parent._validators[prop.key]))
+
+ if useobject:
+ attribute_ext.append(sessionlib.UOWEventHandler(prop.key))
+
+ for mapper in prop.parent.polymorphic_iterator():
+ if (mapper is prop.parent or not mapper.concrete) and mapper.has_property(prop.key):
+ attributes.register_attribute_impl(
+ mapper.class_,
+ prop.key,
+ parent_token=prop,
+ mutable_scalars=mutable_scalars,
+ uselist=uselist,
+ copy_function=copy_function,
+ compare_function=compare_function,
+ useobject=useobject,
+ extension=attribute_ext,
+ trackparent=useobject,
+ typecallable=typecallable,
+ callable_=callable_,
+ active_history=active_history,
+ impl_class=impl_class,
+ **kw
+ )
-class DefaultColumnLoader(LoaderStrategy):
- def _register_attribute(self, compare_function, copy_function, mutable_scalars,
- comparator_factory, callable_=None, proxy_property=None, active_history=False):
- self.logger.info("%s register managed attribute" % self)
-
- attribute_ext = util.to_list(self.parent_property.extension) or []
- if self.key in self.parent._validators:
- attribute_ext.append(mapperutil.Validator(self.key, self.parent._validators[self.key]))
-
- for mapper in self.parent.polymorphic_iterator():
- if (mapper is self.parent or not mapper.concrete) and mapper.has_property(self.key):
- sessionlib.register_attribute(
- mapper.class_,
- self.key,
- uselist=False,
- useobject=False,
- copy_function=copy_function,
- compare_function=compare_function,
- mutable_scalars=mutable_scalars,
- comparator=comparator_factory(self.parent_property, mapper),
- parententity=mapper,
- callable_=callable_,
- extension=attribute_ext,
- proxy_property=proxy_property,
- active_history=active_history
- )
-
-log.class_logger(DefaultColumnLoader)
+class UninstrumentedColumnLoader(LoaderStrategy):
+ """Represent the strategy for a MapperProperty that doesn't instrument the class.
-class ColumnLoader(DefaultColumnLoader):
+ The polymorphic_on argument of mapper() often results in this,
+ if the argument is against the with_polymorphic selectable.
+
+ """
+ def init(self):
+ self.columns = self.parent_property.columns
+
+ def setup_query(self, context, entity, path, adapter, column_collection=None, **kwargs):
+ for c in self.columns:
+ if adapter:
+ c = adapter.columns[c]
+ column_collection.append(c)
+
+ def create_row_processor(self, selectcontext, path, mapper, row, adapter):
+ return (None, None)
+
+class ColumnLoader(LoaderStrategy):
+ """Strategize the loading of a plain column-based MapperProperty."""
def init(self):
self.columns = self.parent_property.columns
self.is_class_level = True
coltype = self.columns[0].type
active_history = self.columns[0].primary_key # TODO: check all columns ? check for foreign Key as well?
-
- self._register_attribute(
- coltype.compare_values,
- coltype.copy_value,
- self.columns[0].type.is_mutable(),
- self.parent_property.comparator_factory,
+
+ _register_attribute(self, useobject=False,
+ compare_function=coltype.compare_values,
+ copy_function=coltype.copy_value,
+ mutable_scalars=self.columns[0].type.is_mutable(),
active_history = active_history
-
)
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
log.class_logger(ColumnLoader)
class CompositeColumnLoader(ColumnLoader):
+ """Strategize the loading of a composite column-based MapperProperty."""
+
def init_class_attribute(self):
self.is_class_level = True
self.logger.info("%s register managed composite attribute" % self)
else:
return True
- self._register_attribute(
- compare,
- copy,
- True,
- self.parent_property.comparator_factory
+ _register_attribute(self, useobject=False,
+ compare_function=compare,
+ copy_function=copy,
+ mutable_scalars=True
+ #active_history ?
)
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
log.class_logger(CompositeColumnLoader)
-class DeferredColumnLoader(DefaultColumnLoader):
- """Deferred column loader, a per-column or per-column-group lazy loader."""
+class DeferredColumnLoader(LoaderStrategy):
+ """Strategize the loading of a deferred column-based MapperProperty."""
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
col = self.columns[0]
def init_class_attribute(self):
self.is_class_level = True
- self._register_attribute(
- self.columns[0].type.compare_values,
- self.columns[0].type.copy_value,
- self.columns[0].type.is_mutable(),
- self.parent_property.comparator_factory,
+
+ _register_attribute(self, useobject=False,
+ compare_function=self.columns[0].type.compare_values,
+ copy_function=self.columns[0].type.copy_value,
+ mutable_scalars=self.columns[0].type.is_mutable(),
callable_=self.class_level_loader,
)
query._attributes[('undefer', self.group)] = True
class AbstractRelationLoader(LoaderStrategy):
+ """LoaderStratgies which deal with related objects as opposed to scalars."""
+
def init(self):
for attr in ['mapper', 'target', 'table', 'uselist']:
setattr(self, attr, getattr(self.parent_property, attr))
state.set_callable(self.key, callable_)
else:
state.initialize(self.key)
-
- def _register_attribute(self, class_, callable_=None, impl_class=None, **kwargs):
- self.logger.info("%s register managed %s attribute" % (self, (self.uselist and "collection" or "scalar")))
-
- attribute_ext = util.to_list(self.parent_property.extension) or []
-
- if self.parent_property.backref:
- attribute_ext.append(self.parent_property.backref.extension)
-
- if self.key in self.parent._validators:
- attribute_ext.append(mapperutil.Validator(self.key, self.parent._validators[self.key]))
-
- sessionlib.register_attribute(
- class_,
- self.key,
- uselist=self.uselist,
- useobject=True,
- extension=attribute_ext,
- trackparent=True,
- typecallable=self.parent_property.collection_class,
- callable_=callable_,
- comparator=self.parent_property.comparator,
- parententity=self.parent,
- impl_class=impl_class,
- **kwargs
- )
class NoLoader(AbstractRelationLoader):
+ """Strategize a relation() that doesn't load data automatically."""
+
def init_class_attribute(self):
self.is_class_level = True
- self._register_attribute(self.parent.class_)
+
+ _register_attribute(self,
+ useobject=True,
+ uselist=self.parent_property.uselist,
+ typecallable = self.parent_property.collection_class,
+ )
def create_row_processor(self, selectcontext, path, mapper, row, adapter):
def new_execute(state, row, **flags):
log.class_logger(NoLoader)
class LazyLoader(AbstractRelationLoader):
+ """Strategize a relation() that loads when first accessed."""
+
def init(self):
super(LazyLoader, self).init()
(self.__lazywhere, self.__bind_to_col, self._equated_columns) = self._create_lazy_clause(self.parent_property)
def init_class_attribute(self):
self.is_class_level = True
- self._register_attribute(self.parent.class_, callable_=self.class_level_loader)
+
+
+ _register_attribute(self,
+ useobject=True,
+ callable_=self.class_level_loader,
+ uselist = self.parent_property.uselist,
+ typecallable = self.parent_property.collection_class,
+ )
def lazy_clause(self, state, reverse_direction=False, alias_secondary=False, adapt_source=None):
if state is None:
return None
class EagerLoader(AbstractRelationLoader):
- """Loads related objects inline with a parent query."""
+ """Strategize a relation() that loads within the process of the parent object being selected."""
def init(self):
super(EagerLoader, self).init()
sess.expunge(oldvalue)
return newvalue
-def register_attribute(class_, key, *args, **kwargs):
- """Register an attribute with the attributes module.
-
- Overrides attributes.register_attribute() to add
- unitofwork-specific event handlers.
-
- """
- useobject = kwargs.get('useobject', False)
- if useobject:
- # for object-holding attributes, instrument UOWEventHandler
- # to process per-attribute cascades
- extension = util.to_list(kwargs.pop('extension', None) or [])
- extension.append(UOWEventHandler(key))
-
- kwargs['extension'] = extension
- return attributes.register_attribute(class_, key, *args, **kwargs)
-
class UOWTransaction(object):
"""Handles the details of organizing and executing transaction
existing = getattr(self.__target, prop.key)
comparator = existing.comparator.adapted(self.__adapt_element)
- queryattr = attributes.QueryableAttribute(
- existing.impl, parententity=self, comparator=comparator)
+ queryattr = attributes.QueryableAttribute(prop.key,
+ impl=existing.impl, parententity=self, comparator=comparator)
setattr(self, prop.key, queryattr)
return queryattr
def _init_items(self, *args):
for column in args:
- self.append_column(column)
+ self.append_column(_to_schema_column(column))
def _set_parent(self, table):
self.table = table
for key in ('on', 'context')
if getattr(self, key)]))
-
+def _to_schema_column(element):
+ if hasattr(element, '__clause_element__'):
+ element = element.__clause_element__()
+ if not isinstance(element, Column):
+ raise exc.ArgumentError("schema.Column object expected")
+ return element
+
def _bind_or_error(schemaitem):
bind = schemaitem.bind
if not bind:
from sqlalchemy.ext import declarative as decl
from sqlalchemy import exc
from testlib import sa, testing
-from testlib.sa import MetaData, Table, Column, Integer, String, ForeignKey, ForeignKeyConstraint, asc
+from testlib.sa import MetaData, Table, Column, Integer, String, ForeignKey, ForeignKeyConstraint, asc, Index
from testlib.sa.orm import relation, create_session, class_mapper, eagerload, compile_mappers, backref, clear_mappers
from testlib.testing import eq_
from orm._base import ComparableEntity, MappedTest
id = Column('id', Integer, primary_key=True)
addresses = relation("Address")
- def go():
- class Address(Base):
- __tablename__ = 'addresses'
+ class Address(Base):
+ __tablename__ = 'addresses'
- id = Column(Integer, primary_key=True)
- foo = sa.orm.column_property(User.id == 5)
- self.assertRaises(sa.exc.InvalidRequestError, go)
+ id = Column(Integer, primary_key=True)
+ foo = sa.orm.column_property(User.id == 5)
+ # this used to raise an error when accessing User.id but that's no longer the case
+ # since we got rid of _CompileOnAttr.
+ self.assertRaises(sa.exc.ArgumentError, compile_mappers)
+
def test_nice_dependency_error_works_with_hasattr(self):
class User(Base):
__tablename__ = 'users'
id = Column('id', Integer, primary_key=True)
addresses = relation("Addresss")
- # doesn't raise, hasattr() squashes all exceptions
- # (except KeybaordInterrupt/SystemException in 2.6...whoopee)
- # TODO: determine what hasattr() does on py3K
- hasattr(User.id, 'in_')
- # but the exception is saved, compile_mappers tells us what it is
- # as well as some explaination
+ # hasattr() on a compile-loaded attribute
+ hasattr(User.addresses, 'property')
+ # the exeption is preserved
self.assertRaisesMessage(sa.exc.InvalidRequestError, r"suppressed within a hasattr\(\)", compile_mappers)
def test_custom_base(self):
assert hasattr(Base, 'metadata')
assert Base().foobar() == "foobar"
+ def test_index_doesnt_compile(self):
+ class User(Base):
+ __tablename__ = 'users'
+ id = Column('id', Integer, primary_key=True)
+ name = Column('name', String)
+ error = relation("Address")
+
+ i = Index('my_index', User.name)
+
+ # compile fails due to the nonexistent Addresses relation
+ self.assertRaises(sa.exc.InvalidRequestError, compile_mappers)
+
+ # index configured
+ assert i in User.__table__.indexes
+ assert User.__table__.c.id not in set(i.columns)
+ assert User.__table__.c.name in set(i.columns)
+
+ # tables create fine
+ Base.metadata.create_all()
+
def test_add_prop(self):
class User(Base, ComparableEntity):
__tablename__ = 'users'
(7, 2),
(6, 3))
+nodes = fixture_table(
+ Table('nodes', fixture_metadata,
+ Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('parent_id', Integer, ForeignKey('nodes.id')),
+ Column('data', String(30)),
+ test_needs_acid=True,
+ test_needs_fk=True
+ ),
+ ('id', 'parent_id', 'data')
+)
+
def _load_fixtures():
for table in fixture_metadata.sorted_tables:
pass
+class Node(Base):
+ pass
+
class FixtureTest(_base.MappedTest):
"""A MappedTest pre-configured for fixtures.
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=typecallable, useobject=True)
obj = Foo()
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=typecallable, useobject=True)
obj = Foo()
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=typecallable, useobject=True)
obj = Foo()
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=typecallable, useobject=True)
obj = Foo()
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=typecallable, useobject=True)
obj = Foo()
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=typecallable, useobject=True)
obj = Foo()
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=typecallable, useobject=True)
obj = Foo()
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=typecallable, useobject=True)
obj = Foo()
pass
canary = Canary()
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary,
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary,
typecallable=Custom, useobject=True)
obj = Foo()
canary = Canary()
creator = self.entity_maker
attributes.register_class(Foo)
- attributes.register_attribute(Foo, 'attr', True, extension=canary, useobject=True)
+ attributes.register_attribute(Foo, 'attr', uselist=True, extension=canary, useobject=True)
obj = Foo()
col1 = obj.attr
from testlib.sa import Table, Column, Integer, String, ForeignKey
from testlib.sa.orm import mapper, relation, backref, create_session
from testlib.testing import eq_
-from testlib.assertsql import RegexSQL, ExactSQL, CompiledSQL
+from testlib.assertsql import RegexSQL, ExactSQL, CompiledSQL, AllOf
from orm import _base
"VALUES (:favorite_ball_id, :data)",
lambda ctx:{'favorite_ball_id':b.id, 'data':'some data'}),
+ AllOf(
CompiledSQL("UPDATE ball SET person_id=:person_id "
"WHERE ball.id = :ball_id",
lambda ctx:{'person_id':p.id,'ball_id':b.id}),
CompiledSQL("UPDATE ball SET person_id=:person_id "
"WHERE ball.id = :ball_id",
- lambda ctx:{'person_id':p.id,'ball_id':b4.id}),
+ lambda ctx:{'person_id':p.id,'ball_id':b4.id})
+ ),
)
sess.delete(p)
self.assert_sql_execution(testing.db, sess.flush,
- CompiledSQL("UPDATE ball SET person_id=:person_id "
+ AllOf(CompiledSQL("UPDATE ball SET person_id=:person_id "
"WHERE ball.id = :ball_id",
lambda ctx:{'person_id': None, 'ball_id': b.id}),
CompiledSQL("UPDATE ball SET person_id=:person_id "
"WHERE ball.id = :ball_id",
- lambda ctx:{'person_id': None, 'ball_id': b4.id}),
+ lambda ctx:{'person_id': None, 'ball_id': b4.id})),
CompiledSQL("DELETE FROM person WHERE person.id = :id",
lambda ctx:[{'id':p.id}]),
from sqlalchemy.orm import *
from sqlalchemy.orm import exc as orm_exc
from testlib import *
+from testlib import sa, testing
+from orm import _base
from sqlalchemy.orm import attributes
from testlib.testing import eq_
pass
-class ConcreteTest(ORMTest):
+class ConcreteTest(_base.MappedTest):
def define_tables(self, metadata):
global managers_table, engineers_table, hackers_table, companies, employees_table
Column('company_id', Integer, ForeignKey('companies.id')),
Column('nickname', String(50))
)
+
+
def test_basic(self):
pjoin = polymorphic_union({
session.clear()
+ assert repr(session.query(Employee).filter(Employee.name=='Tom').one()) == "Manager Tom knows how to manage things"
+ assert repr(session.query(Manager).filter(Manager.name=='Tom').one()) == "Manager Tom knows how to manage things"
+
+
assert set([repr(x) for x in session.query(Employee).all()]) == set(["Engineer Jerry knows how to program", "Manager Tom knows how to manage things", "Hacker Kurt 'Badass' knows how to hack"])
assert set([repr(x) for x in session.query(Manager).all()]) == set(["Manager Tom knows how to manage things"])
assert set([repr(x) for x in session.query(Engineer).all()]) == set(["Engineer Jerry knows how to program", "Hacker Kurt 'Badass' knows how to hack"])
}, 'type', 'pjoin')
mapper(Company, companies, properties={
- 'employees':relation(Employee, lazy=False)
+ 'employees':relation(Employee)
})
employee_mapper = mapper(Employee, pjoin, polymorphic_on=pjoin.c.type)
manager_mapper = mapper(Manager, managers_table, inherits=employee_mapper, concrete=True, polymorphic_identity='manager')
def go():
c2 = session.query(Company).get(c.id)
assert set([repr(x) for x in c2.employees]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
+ self.assert_sql_count(testing.db, go, 2)
+ session.clear()
+ def go():
+ c2 = session.query(Company).options(eagerload(Company.employees)).get(c.id)
+ assert set([repr(x) for x in c2.employees]) == set(["Engineer Kurt knows how to hack", "Manager Tom knows how to manage things"])
self.assert_sql_count(testing.db, go, 1)
-class ColKeysTest(ORMTest):
+class PropertyInheritanceTest(_base.MappedTest):
+ def define_tables(self, metadata):
+ Table('a_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('some_c_id', Integer, ForeignKey('c_table.id')),
+ Column('aname', String(50)),
+ )
+ Table('b_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('some_c_id', Integer, ForeignKey('c_table.id')),
+ Column('bname', String(50)),
+ )
+ Table('c_table', metadata,
+ Column('id', Integer, primary_key=True)
+ )
+
+ def setup_classes(self):
+ class A(_base.ComparableEntity):
+ pass
+
+ class B(A):
+ pass
+
+ class C(_base.ComparableEntity):
+ pass
+
+ @testing.resolve_artifact_names
+ def test_noninherited_warning(self):
+ mapper(A, a_table, properties={
+ 'some_c':relation(C)
+ })
+ mapper(B, b_table,inherits=A, concrete=True)
+ mapper(C, c_table)
+
+ b = B()
+ c = C()
+ self.assertRaises(AttributeError, setattr, b, 'some_c', c)
+
+ clear_mappers()
+ mapper(A, a_table, properties={
+ 'a_id':a_table.c.id
+ })
+ mapper(B, b_table,inherits=A, concrete=True)
+ mapper(C, c_table)
+ b = B()
+ self.assertRaises(AttributeError, setattr, b, 'a_id', 3)
+
+ clear_mappers()
+ mapper(A, a_table, properties={
+ 'a_id':a_table.c.id
+ })
+ mapper(B, b_table,inherits=A, concrete=True)
+ mapper(C, c_table)
+
+ @testing.resolve_artifact_names
+ def test_inheriting(self):
+ mapper(A, a_table, properties={
+ 'some_c':relation(C, back_populates='many_a')
+ })
+ mapper(B, b_table,inherits=A, concrete=True, properties={
+ 'some_c':relation(C, back_populates='many_b')
+ })
+ mapper(C, c_table, properties={
+ 'many_a':relation(A, back_populates='some_c'),
+ 'many_b':relation(B, back_populates='some_c'),
+ })
+
+ sess = sessionmaker()()
+
+ c1 = C()
+ c2 = C()
+ a1 = A(some_c=c1, aname='a1')
+ a2 = A(some_c=c2, aname='a2')
+ b1 = B(some_c=c1, bname='b1')
+ b2 = B(some_c=c1, bname='b2')
+
+ self.assertRaises(AttributeError, setattr, b1, 'aname', 'foo')
+ self.assertRaises(AttributeError, getattr, A, 'bname')
+
+ assert c2.many_a == [a2]
+ assert c1.many_a == [a1]
+ assert c1.many_b == [b1, b2]
+
+ sess.add_all([c1, c2])
+ sess.commit()
+
+ assert sess.query(C).filter(C.many_a.contains(a2)).one() is c2
+ assert c2.many_a == [a2]
+ assert c1.many_a == [a1]
+ assert c1.many_b == [b1, b2]
+
+ assert sess.query(B).filter(B.bname=='b1').one() is b1
+
+ @testing.resolve_artifact_names
+ def test_polymorphic_backref(self):
+ """test multiple backrefs to the same polymorphically-loading attribute."""
+
+ ajoin = polymorphic_union(
+ {'a':a_table,
+ 'b':b_table
+ }, 'type', 'ajoin'
+ )
+ mapper(A, a_table, with_polymorphic=('*', ajoin),
+ polymorphic_on=ajoin.c.type, polymorphic_identity='a',
+ properties={
+ 'some_c':relation(C, back_populates='many_a')
+ })
+ mapper(B, b_table,inherits=A, concrete=True,
+ polymorphic_identity='b',
+ properties={
+ 'some_c':relation(C, back_populates='many_a')
+ })
+ mapper(C, c_table, properties={
+ 'many_a':relation(A, collection_class=set, back_populates='some_c'),
+ })
+
+ sess = sessionmaker()()
+
+ c1 = C()
+ c2 = C()
+ a1 = A(some_c=c1)
+ a2 = A(some_c=c2)
+ b1 = B(some_c=c1)
+ b2 = B(some_c=c1)
+
+ assert c2.many_a == set([a2])
+ assert set(c1.many_a) == set([a1, b1, b2]) # TODO: not sure whats going on with the set comparison here
+
+ sess.add_all([c1, c2])
+ sess.commit()
+
+ assert sess.query(C).filter(C.many_a.contains(a2)).one() is c2
+ assert sess.query(C).filter(C.many_a.contains(b1)).one() is c1
+ assert c2.many_a == set([a2])
+ assert c1.many_a == set([a1, b1, b2])
+
+ sess.expire_all()
+ def go():
+ eq_(
+ sess.query(C).options(eagerload(C.many_a)).all(),
+ [C(many_a=set([a1, b1, b2])), C(many_a=set([a2]))]
+ )
+ self.assert_sql_count(testing.db, go, 1)
+
+
+class ColKeysTest(_base.MappedTest):
def define_tables(self, metadata):
global offices_table, refugees_table
refugees_table = Table('refugee', metadata,
dict(office_fid=1, name=u"office1"),
dict(office_fid=2, name=u"office2")
)
-
+
def test_keys(self):
pjoin = polymorphic_union({
'refugee': refugees_table,
@testing.resolve_artifact_names
def test_exceptions_sticky(self):
+ """test preservation of mapper compile errors raised during hasattr()."""
+
mapper(Address, addresses, properties={
'user':relation(User)
})
- hasattr(Address.id, 'in_')
+
+ hasattr(Address.user, 'property')
self.assertRaisesMessage(sa.exc.InvalidRequestError, r"suppressed within a hasattr\(\)", compile_mappers)
@testing.resolve_artifact_names
adlist = synonym('addresses', proxy=True),
adname = synonym('addresses')
))
+
+ # ensure the synonym can get at the proxied comparators without
+ # an explicit compile
+ User.name == 'ed'
+ User.adname.any()
assert hasattr(User, 'adlist')
# as of 0.4.2, synonyms always create a property
# test compile
assert not isinstance(User.uname == 'jack', bool)
-
+
+ assert User.uname.property
+ assert User.adlist.property
+
sess = create_session()
u = sess.query(User).filter(User.uname=='jack').one()
from testlib.sa import Table, Column, Integer, String, ForeignKey, MetaData
from testlib.sa.orm import mapper, relation, backref, create_session, compile_mappers, clear_mappers
from testlib.testing import eq_, startswith_
-from orm import _base
+from orm import _base, _fixtures
class RelationTest(_base.MappedTest):
[TagInstance(data='iplc_case'), TagInstance(data='not_iplc_case')]
)
+class ManualBackrefTest(_fixtures.FixtureTest):
+ """Test explicit relations that are backrefs to each other."""
+ run_inserts = None
+
+ @testing.resolve_artifact_names
+ def test_o2m(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, back_populates='user')
+ })
+
+ mapper(Address, addresses, properties={
+ 'user':relation(User, back_populates='addresses')
+ })
+
+ sess = create_session()
+
+ u1 = User(name='u1')
+ a1 = Address(email_address='foo')
+ u1.addresses.append(a1)
+ assert a1.user is u1
+
+ sess.add(u1)
+ sess.flush()
+ sess.expire_all()
+ assert sess.query(Address).one() is a1
+ assert a1.user is u1
+ assert a1 in u1.addresses
+
+ @testing.resolve_artifact_names
+ def test_invalid_key(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, back_populates='userr')
+ })
+
+ mapper(Address, addresses, properties={
+ 'user':relation(User, back_populates='addresses')
+ })
+
+ self.assertRaises(sa.exc.InvalidRequestError, compile_mappers)
+
+ @testing.resolve_artifact_names
+ def test_invalid_target(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, back_populates='dingaling'),
+ })
+
+ mapper(Dingaling, dingalings)
+ mapper(Address, addresses, properties={
+ 'dingaling':relation(Dingaling)
+ })
+
+ self.assertRaisesMessage(sa.exc.ArgumentError,
+ r"reverse_property 'dingaling' on relation User.addresses references "
+ "relation Address.dingaling, which does not reference mapper Mapper\|User\|users",
+ compile_mappers)
+
class JoinConditionErrorTest(testing.TestBase):
def test_clauseelement_pj(self):