from sqlalchemy import sql, util, log
import sqlalchemy.exceptions as sa_exc
from sqlalchemy.sql.util import ClauseAdapter, criterion_as_pairs
-from sqlalchemy.sql import operators, ColumnElement, expression
+from sqlalchemy.sql import operators, expression
from sqlalchemy.orm import mapper, strategies, attributes, dependency, \
- object_mapper, session as sessionlib
+ object_mapper
from sqlalchemy.orm.util import CascadeOptions, _class_to_mapper, _orm_annotate
from sqlalchemy.orm.interfaces import StrategizedProperty, PropComparator, \
MapperProperty, ONETOMANY, MANYTOONE, MANYTOMANY
def __clause_element__(self):
return self.prop.columns[0]._annotate({"parententity": self.mapper})
__clause_element__ = util.cache_decorator(__clause_element__)
-
+
def operate(self, op, *other, **kwargs):
return op(self.__clause_element__(), *other, **kwargs)
def do_init(self):
class_ = self.parent.class_
-
+
self.logger.info("register managed attribute %s on class %s" % (self.key, class_.__name__))
if self.descriptor is None:
class SynonymProp(object):
of items that correspond to a related database table.
"""
- def __init__(self, argument,
- secondary=None, primaryjoin=None,
- secondaryjoin=None, entity_name=None,
- foreign_keys=None,
- uselist=None,
- order_by=False,
- backref=None,
- _is_backref=False,
- post_update=False,
- cascade=None,
- viewonly=False, lazy=True,
- collection_class=None, passive_deletes=False,
- passive_updates=True, remote_side=None,
- enable_typechecks=True, join_depth=None,
+ def __init__(self, argument,
+ secondary=None, primaryjoin=None,
+ secondaryjoin=None, entity_name=None,
+ foreign_keys=None,
+ uselist=None,
+ order_by=False,
+ backref=None,
+ _is_backref=False,
+ post_update=False,
+ cascade=None,
+ viewonly=False, lazy=True,
+ collection_class=None, passive_deletes=False,
+ passive_updates=True, remote_side=None,
+ enable_typechecks=True, join_depth=None,
strategy_class=None, _local_remote_pairs=None):
-
+
self.uselist = uselist
self.argument = argument
self.entity_name = entity_name
self.direction = None
self.viewonly = viewonly
self.lazy = lazy
- self._foreign_keys = foreign_keys
+ self._foreign_keys = foreign_keys
self.collection_class = collection_class
self.passive_deletes = passive_deletes
self.passive_updates = passive_updates
self.__join_cache = {}
self.comparator_factory = PropertyLoader.Comparator
util.set_creation_order(self)
-
+
if strategy_class:
self.strategy_class = strategy_class
elif self.lazy == 'dynamic':
self.strategy_class = strategies.LazyLoader
self._reverse_property = None
-
+
if cascade is not None:
self.cascade = CascadeOptions(cascade)
else:
else:
self.backref = backref
self._is_backref = _is_backref
-
+
class Comparator(PropComparator):
def __init__(self, prop, mapper, of_type=None):
self.prop = self.property = prop
self.mapper = mapper
if of_type:
self._of_type = _class_to_mapper(of_type)
-
+
def parententity(self):
return self.prop.parent
parententity = property(parententity)
-
+
def __clause_element__(self):
return self.prop.parent._with_polymorphic_selectable
def reverse_operate(self, op, other, **kwargs):
return op(self, *other, **kwargs)
-
+
def of_type(self, cls):
return PropertyLoader.Comparator(self.prop, self.mapper, cls)
-
+
def __eq__(self, other):
if other is None:
if self.prop.direction in [ONETOMANY, MANYTOMANY]:
criterion = crit
else:
criterion = criterion & crit
-
+
if sj:
j = _orm_annotate(pj) & sj
else:
j = _orm_annotate(pj, exclude=self.prop.remote_side)
-
+
if criterion and target_adapter:
# limit this adapter to annotated only?
criterion = target_adapter.traverse(criterion)
-
+
# only have the "joined left side" of what we return be subject to Query adaption. The right
# side of it is used for an exists() subquery and should not correlate or otherwise reach out
# to anything in the enclosing query.
if criterion:
criterion = criterion._annotate({'_halt_adapt': True})
return sql.exists([1], j & criterion, from_obj=dest).correlate(source)
-
+
def any(self, criterion=None, **kwargs):
if not self.prop.uselist:
raise sa_exc.InvalidRequestError("'any()' not implemented for scalar attributes. Use has().")
def __negated_contains_or_equals(self, other):
criterion = sql.and_(*[x==y for (x, y) in zip(self.prop.mapper.primary_key, self.prop.mapper.primary_key_from_instance(other))])
return ~self._criterion_exists(criterion)
-
+
def __ne__(self, other):
if other is None:
if self.prop.direction == MANYTOONE:
if dont_load:
coll = attributes.init_collection(dest_state, self.key)
for c in dest_list:
- coll.append_without_event(c)
+ coll.append_without_event(c)
else:
getattr(dest.__class__, self.key).impl._set_iterable(dest_state, dest_list)
else:
if not type_ in self.cascade:
return
# only actively lazy load on the 'delete' cascade
- passive = type_ != 'delete' or self.passive_deletes
+ passive = type_ != 'delete' or self.passive_deletes
mapper = self.mapper.primary_mapper()
instances = state.value_as_iterable(self.key, passive=passive)
if instances:
for attr in ('order_by', 'primaryjoin', 'secondaryjoin', 'secondary', '_foreign_keys', 'remote_side'):
if callable(getattr(self, attr)):
setattr(self, attr, getattr(self, attr)())
-
+
self._foreign_keys = util.to_set(self._foreign_keys)
self.remote_side = util.to_set(self.remote_side)
-
+
if not self.parent.concrete:
for inheriting in self.parent.iterate_to_root():
if inheriting is not self.parent and inheriting._get_property(self.key, raiseerr=False):
# TODO: remove 'self.table'
self.target = self.table = self.mapper.mapped_table
-
+
if self.cascade.delete_orphan:
if self.parent.class_ is self.mapper.class_:
raise sa_exc.ArgumentError("In relationship '%s', can't establish 'delete-orphan' cascade "
return self.parent.mapped_table.c.contains_column(column) or \
self.target.c.contains_column(column) or \
self.secondary.c.contains_column(column) is not None
-
+
def _determine_synchronize_pairs(self):
if self.local_remote_pairs:
raise sa_exc.ArgumentError("foreign_keys argument is required with _local_remote_pairs argument")
self.synchronize_pairs = []
-
+
for l, r in self.local_remote_pairs:
if r in self._foreign_keys:
self.synchronize_pairs.append((l, r))
else:
raise sa_exc.ArgumentError("Could not determine relation direction for primaryjoin condition '%s', on relation %s. "
"Specify the 'foreign_keys' argument to indicate which columns on the relation are foreign." % (self.primaryjoin, self))
-
+
self.synchronize_pairs = eq_pairs
-
+
if self.secondaryjoin:
sq_pairs = criterion_as_pairs(self.secondaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=self.viewonly)
sq_pairs = [(l, r) for l, r in sq_pairs if (self._col_is_part_of_mappings(l) and self._col_is_part_of_mappings(r)) or r in self._foreign_keys]
-
+
if not sq_pairs:
if not self.viewonly and criterion_as_pairs(self.secondaryjoin, consider_as_foreign_keys=self._foreign_keys, any_operator=True):
raise sa_exc.ArgumentError("Could not locate any equated, locally mapped column pairs for secondaryjoin condition '%s' on relation %s. "
self.secondary_synchronize_pairs = sq_pairs
else:
self.secondary_synchronize_pairs = None
-
+
self._foreign_keys = util.Set([r for l, r in self.synchronize_pairs])
if self.secondary_synchronize_pairs:
self._foreign_keys.update([r for l, r in self.secondary_synchronize_pairs])
if self.remote_side:
if self.direction is MANYTOONE:
self.local_remote_pairs = [
- (r, l) for l, r in
+ (r, l) for l, r in
criterion_as_pairs(self.primaryjoin, consider_as_referenced_keys=self.remote_side, any_operator=True)
]
else:
self.local_remote_pairs = eq_pairs
elif self.remote_side:
raise sa_exc.ArgumentError("remote_side argument is redundant against more detailed _local_remote_side argument.")
-
+
for l, r in self.local_remote_pairs:
-
+
if self.direction is ONETOMANY and not self._col_is_part_of_mappings(l):
raise sa_exc.ArgumentError("Local column '%s' is not part of mapping %s. Specify remote_side argument to indicate which column lazy join condition should compare against." % (l, self.parent))
-
+
elif self.direction is MANYTOONE and not self._col_is_part_of_mappings(r):
raise sa_exc.ArgumentError("Remote column '%s' is not part of mapping %s. Specify remote_side argument to indicate which column lazy join condition should bind." % (r, self.mapper))
-
+
self.local_side, self.remote_side = [util.OrderedSet(x) for x in zip(*list(self.local_remote_pairs))]
-
+
def _post_init(self):
if log.is_info_enabled(self.logger):
def _refers_to_parent_table(self):
return self.parent.mapped_table is self.target or self.parent.mapped_table is self.target
-
+
def _is_self_referential(self):
return self.mapper.common_parent(self.parent)
-
+
def _create_joins(self, source_polymorphic=False, source_selectable=None, dest_polymorphic=False, dest_selectable=None):
key = util.WeakCompositeKey(source_polymorphic, source_selectable, dest_polymorphic, dest_selectable)
try:
aliased = True
else:
dest_selectable = self.mapper.mapped_table
-
+
if self._is_self_referential() and source_selectable is None:
dest_selectable = dest_selectable.alias()
aliased = True
else:
aliased = True
-
+
aliased = aliased or bool(source_selectable)
primaryjoin, secondaryjoin, secondary = self.primaryjoin, self.secondaryjoin, self.secondary
else:
if dest_selectable:
primary_aliasizer = ClauseAdapter(dest_selectable, exclude=self.local_side, equivalents=self.mapper._equivalent_columns)
- if source_selectable:
+ if source_selectable:
primary_aliasizer.chain(ClauseAdapter(source_selectable, exclude=self.remote_side, equivalents=self.parent._equivalent_columns))
elif source_selectable:
primary_aliasizer = ClauseAdapter(source_selectable, exclude=self.remote_side, equivalents=self.parent._equivalent_columns)
secondary_aliasizer = None
-
+
primaryjoin = primary_aliasizer.traverse(primaryjoin)
target_adapter = secondary_aliasizer or primary_aliasizer
target_adapter.include = target_adapter.exclude = None
self.__join_cache[key] = ret = (primaryjoin, secondaryjoin, (source_selectable or self.parent.local_table), (dest_selectable or self.mapper.local_table), secondary, target_adapter)
return ret
-
+
def _get_join(self, parent, primary=True, secondary=True, polymorphic_parent=True):
"""deprecated. use primary_join_against(), secondary_join_against(), full_join_against()"""
-
+
pj, sj, source, dest, secondarytable, adapter = self._create_joins(source_polymorphic=polymorphic_parent)
-
+
if primary and secondary:
return pj & sj
elif primary:
return sj
else:
raise AssertionError("illegal condition")
-
-
+
+
def register_dependencies(self, uowcommit):
if not self.viewonly:
self._dependency_processor.register_dependencies(uowcommit)
self.kwargs = kwargs
self.prop = _prop
self.extension = attributes.GenericBackrefExtension(self.key)
-
+
def compile(self, prop):
if self.prop:
return
pj = self.kwargs.pop('primaryjoin', prop.primaryjoin)
sj = self.kwargs.pop('secondaryjoin', None)
if sj:
- raise exceptions.InvalidRequestError("Can't assign 'secondaryjoin' on a backref against a non-secondary relation.")
-
+ raise sa_exc.InvalidRequestError(
+ "Can't assign 'secondaryjoin' on a backref against "
+ "a non-secondary relation.")
+
parent = prop.parent.primary_mapper()
self.kwargs.setdefault('viewonly', prop.viewonly)
self.kwargs.setdefault('post_update', prop.post_update)
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-import inspect
-import types
-
import sqlalchemy.exceptions as sa_exc
from sqlalchemy.util import ScopedRegistry, to_list, get_cls_kwargs
from sqlalchemy.orm import MapperExtension, EXT_CONTINUE, object_session, \
Usage::
Session = scoped_session(sessionmaker(autoflush=True))
-
+
To map classes so that new instances are saved in the current
Session automatically, as well as to provide session-aware
class attributes such as "query":
-
+
mapper = Session.mapper
mapper(Class, table, ...)
return self.session_factory(**kwargs)
else:
return self.registry()
-
+
def remove(self):
if self.registry.has():
self.registry().close()
self.registry.clear()
-
+
def mapper(self, *args, **kwargs):
"""return a mapper() function which associates this ScopedSession with the Mapper."""
-
+
from sqlalchemy.orm import mapper
-
+
extension_args = dict([(arg, kwargs.pop(arg))
for arg in get_cls_kwargs(_ScopedExt)
if arg in kwargs])
-
+
kwargs['extension'] = extension = to_list(kwargs.get('extension', []))
if extension_args:
extension.append(self.extension.configure(**extension_args))
else:
extension.append(self.extension)
return mapper(*args, **kwargs)
-
+
def configure(self, **kwargs):
"""reconfigure the sessionmaker used by this ScopedSession."""
-
+
self.session_factory.configure(**kwargs)
def query_property(self):
"""return a class property which produces a `Query` object against the
class when called.
-
+
e.g.::
Session = scoped_session(sessionmaker())
-
+
class MyClass(object):
query = Session.query_property()
-
+
# after mappers are defined
result = MyClass.query.filter(MyClass.name=='foo').all()
-
+
"""
-
+
class query(object):
def __get__(s, instance, owner):
mapper = class_mapper(owner, raiseerror=False)
else:
return None
return query()
-
+
def instrument(name):
def do(self, *args, **kwargs):
return getattr(self.registry(), name)(*args, **kwargs)
return classmethod(do)
for prop in ('close_all', 'object_session', 'identity_key'):
setattr(ScopedSession, prop, clslevel(prop))
-
+
class _ScopedExt(MapperExtension):
def __init__(self, context, validate=False, save_on_init=True):
self.context = context
self.validate = validate
self.save_on_init = save_on_init
self.set_kwargs_on_init = None
-
+
def validating(self):
return _ScopedExt(self.context, validate=True)
-
+
def configure(self, **kwargs):
return _ScopedExt(self.context, **kwargs)
-
+
def instrument_class(self, mapper, class_):
class query(object):
def __getattr__(s, key):
return self.context.registry().query(class_)
def __get__(self, instance, cls):
return self
-
- if not 'query' in class_.__dict__:
+
+ if not 'query' in class_.__dict__:
class_.query = query()
if self.set_kwargs_on_init is None:
delattr(class_, '__init__')
if hasattr(class_, 'query'):
delattr(class_, 'query')
-
-
-