from .query import Query
from .. import exc
from .. import log
-from .. import sql
from .. import util
-from ..engine import result as _result
-from ..sql import selectable
-from ..sql import util as sql_util
-from ..sql.base import _generative
-from ..sql.base import Generative
@log.class_logger
dispatch,
target_mapper,
order_by,
+ query_class=None,
**kw
):
super(DynamicAttributeImpl, self).__init__(
)
self.target_mapper = target_mapper
self.order_by = order_by
- self.query_class = AppenderQuery
+ if not query_class:
+ self.query_class = AppenderQuery
+ elif AppenderMixin in query_class.mro():
+ self.query_class = query_class
+ else:
+ self.query_class = mixin_user_query(query_class)
def get(self, state, dict_, passive=attributes.PASSIVE_OFF):
if not passive & attributes.SQL_OK:
self.remove(state, dict_, value, initiator, passive=passive)
-class AppenderQuery(Generative):
- """A dynamic query that supports basic collection storage operations."""
+class AppenderMixin(object):
+ query_class = None
def __init__(self, attr, state):
-
- # this can be select() except for aliased=True flag on join()
- # and corresponding behaviors on select().
- self._is_core = False
- self._statement = Query([attr.target_mapper], None)
-
- # self._is_core = True
- # self._statement = sql.select(attr.target_mapper)._set_label_style(
- # selectable.LABEL_STYLE_TABLENAME_PLUS_COL
- # )
-
- self._autoflush = True
+ super(AppenderMixin, self).__init__(attr.target_mapper, None)
self.instance = instance = state.obj()
self.attr = attr
- self.mapper = mapper = object_mapper(instance)
+ mapper = object_mapper(instance)
prop = mapper._props[self.attr.key]
if prop.secondary is not None:
# is in the FROM. So we purposely put the mapper selectable
# in _from_obj[0] to ensure a user-defined join() later on
# doesn't fail, and secondary is then in _from_obj[1].
- self._statement = self._statement.select_from(
- prop.mapper.selectable, prop.secondary
- )
+ self._from_obj = (prop.mapper.selectable, prop.secondary)
- self._statement = self._statement.where(
+ self._where_criteria = (
prop._with_parent(instance, alias_secondary=False),
)
if self.attr.order_by:
-
- self._statement = self._statement.order_by(*self.attr.order_by)
-
- @_generative
- def autoflush(self, setting):
- """Set autoflush to a specific setting.
-
- Note that a Session with autoflush=False will
- not autoflush, even if this flag is set to True at the
- Query level. Therefore this flag is usually used only
- to disable autoflush for a specific Query.
-
- """
- self._autoflush = setting
-
- @property
- def statement(self):
- """Return the Core statement represented by this
- :class:`.AppenderQuery`.
-
- """
- if self._is_core:
- return self._statement._set_label_style(
- selectable.LABEL_STYLE_DISAMBIGUATE_ONLY
- )
-
- else:
- return self._statement.statement
-
- def filter(self, *criteria):
- """A synonym for the :meth:`_orm.AppenderQuery.where` method."""
-
- return self.where(*criteria)
-
- @_generative
- def where(self, *criteria):
- r"""Apply the given WHERE criterion, using SQL expressions.
-
- Equivalent to :meth:`.Select.where`.
-
- """
- self._statement = self._statement.where(*criteria)
-
- @_generative
- def order_by(self, *criteria):
- r"""Apply the given ORDER BY criterion, using SQL expressions.
-
- Equivalent to :meth:`.Select.order_by`.
-
- """
- self._statement = self._statement.order_by(*criteria)
-
- @_generative
- def filter_by(self, **kwargs):
- r"""Apply the given filtering criterion using keyword expressions.
-
- Equivalent to :meth:`.Select.filter_by`.
-
- """
- self._statement = self._statement.filter_by(**kwargs)
-
- @_generative
- def join(self, target, *props, **kwargs):
- r"""Create a SQL JOIN against this
- object's criterion.
-
- Equivalent to :meth:`.Select.join`.
- """
-
- self._statement = self._statement.join(target, *props, **kwargs)
-
- @_generative
- def outerjoin(self, target, *props, **kwargs):
- r"""Create a SQL LEFT OUTER JOIN against this
- object's criterion.
-
- Equivalent to :meth:`.Select.outerjoin`.
-
- """
-
- self._statement = self._statement.outerjoin(target, *props, **kwargs)
-
- def scalar(self):
- """Return the first element of the first result or None
- if no rows present. If multiple rows are returned,
- raises MultipleResultsFound.
-
- Equivalent to :meth:`_query.Query.scalar`.
-
- .. versionadded:: 1.1.6
-
- """
- return self._iter().scalar()
-
- def first(self):
- """Return the first row.
-
- Equivalent to :meth:`_query.Query.first`.
-
- """
-
- # replicates limit(1) behavior
- if self._statement is not None:
- return self._iter().first()
- else:
- return self.limit(1)._iter().first()
-
- def one(self):
- """Return exactly one result or raise an exception.
-
- Equivalent to :meth:`_query.Query.one`.
-
- """
- return self._iter().one()
-
- def one_or_none(self):
- """Return one or zero results, or raise an exception for multiple
- rows.
-
- Equivalent to :meth:`_query.Query.one_or_none`.
-
- .. versionadded:: 1.0.9
-
- """
- return self._iter().one_or_none()
-
- def all(self):
- """Return all rows.
-
- Equivalent to :meth:`_query.Query.all`.
-
- """
- return self._iter().all()
+ self._order_by_clauses = self.attr.order_by
def session(self):
sess = object_session(self.instance)
if (
sess is not None
- and self._autoflush
+ and self.autoflush
and sess.autoflush
and self.instance in sess
):
session = property(session, lambda s, x: None)
- def _execute(self, sess=None):
- # note we're returning an entirely new Query class instance
- # here without any assignment capabilities; the class of this
- # query is determined by the session.
- instance = self.instance
- if sess is None:
- sess = object_session(instance)
- if sess is None:
- raise orm_exc.DetachedInstanceError(
- "Parent instance %s is not bound to a Session, and no "
- "contextual session is established; lazy load operation "
- "of attribute '%s' cannot proceed"
- % (orm_util.instance_str(instance), self.attr.key)
- )
-
- result = sess.execute(self._statement)
- result = result.scalars()
-
- if result._attributes.get("filtered", False):
- result = result.unique()
-
- return result
-
- def _iter(self):
+ def __iter__(self):
sess = self.session
if sess is None:
- instance = self.instance
- state = attributes.instance_state(instance)
-
- if state.detached:
- raise orm_exc.DetachedInstanceError(
- "Parent instance %s is not bound to a Session, and no "
- "contextual session is established; lazy load operation "
- "of attribute '%s' cannot proceed"
- % (orm_util.instance_str(instance), self.attr.key)
- )
- else:
- iterator = (
- (item,)
- for item in self.attr._get_collection_history(
- state,
- attributes.PASSIVE_NO_INITIALIZE,
- ).added_items
- )
-
- row_metadata = _result.SimpleResultMetaData(
- (self.mapper.class_.__name__,),
- [],
- _unique_filters=[id],
- )
-
- return _result.IteratorResult(row_metadata, iterator).scalars()
+ return iter(
+ self.attr._get_collection_history(
+ attributes.instance_state(self.instance),
+ attributes.PASSIVE_NO_INITIALIZE,
+ ).added_items
+ )
else:
- return self._execute(sess)
-
- def __iter__(self):
- return iter(self._iter())
+ return iter(self._generate(sess))
def __getitem__(self, index):
sess = self.session
attributes.PASSIVE_NO_INITIALIZE,
).indexed(index)
else:
- return orm_util._getitem(
- self, index, allow_negative=not self.session.future
- )
-
- @_generative
- def limit(self, limit):
- self._statement = self._statement.limit(limit)
-
- @_generative
- def offset(self, offset):
- self._statement = self._statement.offset(offset)
-
- @_generative
- def slice(self, start, stop):
- """Computes the "slice" represented by
- the given indices and apply as LIMIT/OFFSET.
-
-
- """
- limit_clause, offset_clause = sql_util._make_slice(
- self._statement._limit_clause,
- self._statement._offset_clause,
- start,
- stop,
- )
-
- self._statement = self._statement.limit(limit_clause).offset(
- offset_clause
- )
+ return self._generate(sess).__getitem__(index)
def count(self):
- """return the 'count'.
-
- Equivalent to :meth:`_query.Query.count`.
-
-
- """
-
sess = self.session
if sess is None:
return len(
).added_items
)
else:
- col = sql.func.count(sql.literal_column("*"))
+ return self._generate(sess).count()
+
+ def _generate(self, sess=None):
+ # note we're returning an entirely new Query class instance
+ # here without any assignment capabilities; the class of this
+ # query is determined by the session.
+ instance = self.instance
+ if sess is None:
+ sess = object_session(instance)
+ if sess is None:
+ raise orm_exc.DetachedInstanceError(
+ "Parent instance %s is not bound to a Session, and no "
+ "contextual session is established; lazy load operation "
+ "of attribute '%s' cannot proceed"
+ % (orm_util.instance_str(instance), self.attr.key)
+ )
+
+ if self.query_class:
+ query = self.query_class(self.attr.target_mapper, session=sess)
+ else:
+ query = sess.query(self.attr.target_mapper)
+
+ query._where_criteria = self._where_criteria
+ query._from_obj = self._from_obj
+ query._order_by_clauses = self._order_by_clauses
- stmt = sql.select(col).select_from(self._statement.subquery())
- return self.session.execute(stmt).scalar()
+ return query
def extend(self, iterator):
for item in iterator:
)
+class AppenderQuery(AppenderMixin, Query):
+ """A dynamic query that supports basic collection storage operations."""
+
+
+def mixin_user_query(cls):
+ """Return a new class with AppenderQuery functionality layered over."""
+ name = "Appender" + cls.__name__
+ return type(name, (AppenderMixin, cls), {"query_class": cls})
+
+
class CollectionHistory(object):
"""Overrides AttributeHistory to receive append/remove events directly."""