import operator
from itertools import groupby
-from sqlalchemy import sql, util, exc as sa_exc
+from sqlalchemy import sql, util, exc as sa_exc, schema
from sqlalchemy.orm import attributes, sync, \
- exc as orm_exc
+ exc as orm_exc,\
+ evaluator
-from sqlalchemy.orm.util import _state_mapper, state_str
+from sqlalchemy.orm.util import _state_mapper, state_str, _attr_as_key
+from sqlalchemy.sql import expression
def save_obj(base_mapper, states, uowtransaction, single=False):
"""Issue ``INSERT`` and/or ``UPDATE`` statements for a list
def _organize_states_for_save(base_mapper, states, uowtransaction):
"""Make an initial pass across a set of states for INSERT or
UPDATE.
-
+
This includes splitting out into distinct lists for
each, calling before_insert/before_update, obtaining
key information for each state including its dictionary,
mapper, the connection to use for the execution per state,
and the identity flag.
-
+
"""
states_to_insert = []
uowtransaction):
"""Make an initial pass across a set of states for UPDATE
corresponding to post_update.
-
+
This includes obtaining key information for each state
including its dictionary, mapper, the connection to use for
the execution per state.
-
+
"""
return list(_connections_for_states(base_mapper, uowtransaction,
states))
def _organize_states_for_delete(base_mapper, states, uowtransaction):
"""Make an initial pass across a set of states for DELETE.
-
+
This includes calling out before_delete and obtaining
key information for each state including its dictionary,
mapper, the connection to use for the execution per state.
-
+
"""
states_to_delete = []
states_to_insert):
"""Identify sets of values to use in INSERT statements for a
list of states.
-
+
"""
insert = []
for state, state_dict, mapper, connection, has_identity, \
table, states_to_update):
"""Identify sets of values to use in UPDATE statements for a
list of states.
-
+
This function works intricately with the history system
to determine exactly what values should be updated
as well as how the row should be matched within an UPDATE
states_to_insert, states_to_update):
"""finalize state on states that have been inserted or updated,
including calling after_insert/after_update events.
-
+
"""
for state, state_dict, mapper, connection, has_identity, \
instance_key, row_switch in states_to_insert + \
def _connections_for_states(base_mapper, uowtransaction, states):
"""Return an iterator of (state, state.dict, mapper, connection).
-
+
The states are sorted according to _sort_states, then paired
with the connection they should be using for the given
unit of work transaction.
-
+
"""
# if session has a connection callable,
# organize individual states with the connection
return sorted(pending, key=operator.attrgetter("insert_order")) + \
sorted(persistent, key=lambda q:q.key[1])
+class BulkUD(object):
+ """Handle bulk update and deletes via a :class:`.Query`."""
+
+ def __init__(self, query):
+ self.query = query.enable_eagerloads(False)
+
+ @classmethod
+ def _factory(cls, lookup, synchronize_session, *arg):
+ try:
+ klass = lookup[synchronize_session]
+ except KeyError:
+ raise sa_exc.ArgumentError(
+ "Valid strategies for session synchronization "
+ "are %s" % (", ".join(sorted(repr(x)
+ for x in lookup.keys()))))
+ else:
+ return klass(*arg)
+
+ def exec_(self):
+ self._do_pre()
+ self._do_pre_synchronize()
+ self._do_exec()
+ self._do_post_synchronize()
+ self._do_post()
+
+ def _do_pre(self):
+ query = self.query
+ self.context = context = query._compile_context()
+ if len(context.statement.froms) != 1 or \
+ not isinstance(context.statement.froms[0], schema.Table):
+ raise sa_exc.ArgumentError(
+ "Only update via a single table query is "
+ "currently supported")
+ self.primary_table = context.statement.froms[0]
+
+ session = query.session
+
+ if query._autoflush:
+ session._autoflush()
+
+ def _do_pre_synchronize(self):
+ pass
+
+ def _do_post_synchronize(self):
+ pass
+
+class BulkEvaluate(BulkUD):
+ """BulkUD which does the 'evaluate' method of session state resolution."""
+
+ def _additional_evaluators(self, evaluator_compiler):
+ pass
+
+ def _do_pre_synchronize(self):
+ query = self.query
+ try:
+ evaluator_compiler = evaluator.EvaluatorCompiler()
+ if query.whereclause is not None:
+ eval_condition = evaluator_compiler.process(
+ query.whereclause)
+ else:
+ def eval_condition(obj):
+ return True
+
+ self._additional_evaluators(evaluator_compiler)
+
+ except evaluator.UnevaluatableError:
+ raise sa_exc.InvalidRequestError(
+ "Could not evaluate current criteria in Python. "
+ "Specify 'fetch' or False for the "
+ "synchronize_session parameter.")
+ target_cls = query._mapper_zero().class_
+
+ #TODO: detect when the where clause is a trivial primary key match
+ self.matched_objects = [
+ obj for (cls, pk),obj in
+ query.session.identity_map.iteritems()
+ if issubclass(cls, target_cls) and
+ eval_condition(obj)]
+
+class BulkFetch(BulkUD):
+ """BulkUD which does the 'fetch' method of session state resolution."""
+
+ def _do_pre_synchronize(self):
+ query = self.query
+ session = query.session
+ select_stmt = self.context.statement.with_only_columns(
+ self.primary_table.primary_key)
+ self.matched_rows = session.execute(
+ select_stmt,
+ params=query._params).fetchall()
+
+class BulkUpdate(BulkUD):
+ """BulkUD which handles UPDATEs."""
+
+ def __init__(self, query, values):
+ super(BulkUpdate, self).__init__(query)
+ self.query._no_select_modifiers("update")
+ self.values = values
+
+ @classmethod
+ def factory(cls, query, synchronize_session, values):
+ return BulkUD._factory({
+ "evaluate":BulkUpdateEvaluate,
+ "fetch":BulkUpdateFetch,
+ False:BulkUpdate
+ }, synchronize_session, query, values)
+
+ def _do_exec(self):
+ update_stmt = sql.update(self.primary_table,
+ self.context.whereclause, self.values)
+
+ self.result = self.query.session.execute(
+ update_stmt, params=self.query._params)
+ self.rowcount = self.result.rowcount
+
+ def _do_post(self):
+ session = self.query.session
+ session.dispatch.after_bulk_update(session, self.query,
+ self.context, self.result)
+
+class BulkDelete(BulkUD):
+ """BulkUD which handles DELETEs."""
+
+ def __init__(self, query):
+ super(BulkDelete, self).__init__(query)
+ self.query._no_select_modifiers("delete")
+
+ @classmethod
+ def factory(cls, query, synchronize_session):
+ return BulkUD._factory({
+ "evaluate":BulkDeleteEvaluate,
+ "fetch":BulkDeleteFetch,
+ False:BulkDelete
+ }, synchronize_session, query)
+
+ def _do_exec(self):
+ delete_stmt = sql.delete(self.primary_table,
+ self.context.whereclause)
+
+ self.result = self.query.session.execute(delete_stmt,
+ params=self.query._params)
+ self.rowcount = self.result.rowcount
+
+ def _do_post(self):
+ session = self.query.session
+ session.dispatch.after_bulk_delete(session, self.query,
+ self.context, self.result)
+
+class BulkUpdateEvaluate(BulkEvaluate, BulkUpdate):
+ """BulkUD which handles UPDATEs using the "evaluate"
+ method of session resolution."""
+
+ def _additional_evaluators(self,evaluator_compiler):
+ self.value_evaluators = {}
+ for key,value in self.values.iteritems():
+ key = _attr_as_key(key)
+ self.value_evaluators[key] = evaluator_compiler.process(
+ expression._literal_as_binds(value))
+
+ def _do_post_synchronize(self):
+ session = self.query.session
+ states = set()
+ evaluated_keys = self.value_evaluators.keys()
+ for obj in self.matched_objects:
+ state, dict_ = attributes.instance_state(obj),\
+ attributes.instance_dict(obj)
+
+ # only evaluate unmodified attributes
+ to_evaluate = state.unmodified.intersection(
+ evaluated_keys)
+ for key in to_evaluate:
+ dict_[key] = self.value_evaluators[key](obj)
+
+ state.commit(dict_, list(to_evaluate))
+
+ # expire attributes with pending changes
+ # (there was no autoflush, so they are overwritten)
+ state.expire_attributes(dict_,
+ set(evaluated_keys).
+ difference(to_evaluate))
+ states.add(state)
+ session._register_altered(states)
+
+class BulkDeleteEvaluate(BulkEvaluate, BulkDelete):
+ """BulkUD which handles DELETEs using the "evaluate"
+ method of session resolution."""
+
+ def _do_post_synchronize(self):
+ self.query.session._remove_newly_deleted(
+ [attributes.instance_state(obj)
+ for obj in self.matched_objects])
+
+class BulkUpdateFetch(BulkFetch, BulkUpdate):
+ """BulkUD which handles UPDATEs using the "fetch"
+ method of session resolution."""
+
+ def _do_post_synchronize(self):
+ session = self.query.session
+ target_mapper = self.query._mapper_zero()
+
+ states = set([
+ attributes.instance_state(session.identity_map[identity_key])
+ for identity_key in [
+ target_mapper.identity_key_from_primary_key(
+ list(primary_key))
+ for primary_key in self.matched_rows
+ ]
+ ])
+ attrib = [_attr_as_key(k) for k in self.values]
+ for state in states:
+ session._expire_state(state, attrib)
+ session._register_altered(states)
+
+class BulkDeleteFetch(BulkFetch, BulkDelete):
+ """BulkUD which handles DELETEs using the "fetch"
+ method of session resolution."""
+
+ def _do_post_synchronize(self):
+ session = self.query.session
+ target_mapper = self.query._mapper_zero()
+ for primary_key in self.matched_rows:
+ # TODO: inline this and call remove_newly_deleted
+ # once
+ identity_key = target_mapper.identity_key_from_primary_key(
+ list(primary_key))
+ if identity_key in session.identity_map:
+ session._remove_newly_deleted(
+ [attributes.instance_state(
+ session.identity_map[identity_key]
+ )]
+ )
from sqlalchemy import sql, util, log, schema
from sqlalchemy import exc as sa_exc
from sqlalchemy.orm import exc as orm_exc
+from sqlalchemy.orm import persistence
from sqlalchemy.sql import util as sql_util
from sqlalchemy.sql import expression, visitors, operators
from sqlalchemy.orm import (
represented as a common table expression (CTE).
The :meth:`.Query.cte` method is new in 0.7.6.
-
+
Parameters and usage are the same as those of the
:meth:`._SelectBase.cte` method; see that method for
further details.
-
+
Here is the `Postgresql WITH
RECURSIVE example <http://www.postgresql.org/docs/8.4/static/queries-with.html>`_.
Note that, in this example, the ``included_parts`` cte and the ``incl_alias`` alias
group_by(included_parts.c.sub_part)
See also:
-
+
:meth:`._SelectBase.cte`
"""
selectable=None,
polymorphic_on=None):
"""Load columns for inheriting classes.
-
+
:meth:`.Query.with_polymorphic` applies transformations
to the "main" mapped class represented by this :class:`.Query`.
The "main" mapped class here means the :class:`.Query`
subclass are available in the query, both for the purposes
of load-time efficiency as well as the ability to use
these columns at query time.
-
+
See the documentation section :ref:`with_polymorphic` for
details on how this method is used.
-
+
As of 0.8, a new and more flexible function
:func:`.orm.with_polymorphic` supersedes
:meth:`.Query.with_polymorphic`, as it can apply the equivalent
def get(self, ident):
"""Return an instance based on the given primary key identifier,
or ``None`` if not found.
-
+
E.g.::
-
+
my_user = session.query(User).get(5)
-
+
some_object = session.query(VersionedFoo).get((5, 10))
-
+
:meth:`~.Query.get` is special in that it provides direct
access to the identity map of the owning :class:`.Session`.
If the given primary key identifier is present
unless the object has been marked fully expired.
If not present,
a SELECT is performed in order to locate the object.
-
+
:meth:`~.Query.get` also will perform a check if
the object is present in the identity map and
marked as expired - a SELECT
is emitted to refresh the object as well as to
ensure that the row is still present.
If not, :class:`~sqlalchemy.orm.exc.ObjectDeletedError` is raised.
-
+
:meth:`~.Query.get` is only used to return a single
mapped instance, not multiple instances or
individual column constructs, and strictly
options via :meth:`~.Query.options` may be applied
however, and will be used if the object is not
yet locally present.
-
+
A lazy-loading, many-to-one attribute configured
by :func:`.relationship`, using a simple
foreign-key-to-primary-key criterion, will also use an
the target value from the local identity map
before querying the database. See :ref:`loading_toplevel`
for further details on relationship loading.
-
+
:param ident: A scalar or tuple value representing
the primary key. For a composite primary key,
the order of identifiers corresponds in most cases
to the elements present in this collection.
:return: The object instance, or ``None``.
-
+
"""
# convert composite types to individual args
"""Set the 'invoke all eagers' flag which causes joined- and
subquery loaders to traverse into already-loaded related objects
and collections.
-
+
Default is that of :attr:`.Query._invoke_all_eagers`.
"""
def with_transformation(self, fn):
"""Return a new :class:`.Query` object transformed by
the given function.
-
+
E.g.::
-
+
def filter_something(criterion):
def transform(q):
return q.filter(criterion)
return transform
-
+
q = q.with_transformation(filter_something(x==5))
-
+
This allows ad-hoc recipes to be created for :class:`.Query`
objects. See the example at :ref:`hybrid_transformers`.
``'read_nowait'`` - passes ``for_update='read_nowait'``, which
translates to ``FOR SHARE NOWAIT`` (supported by PostgreSQL).
-
+
New in 0.7.7: ``FOR SHARE`` and ``FOR SHARE NOWAIT`` (PostgreSQL)
"""
of this :class:`.Query`, using SQL expressions.
e.g.::
-
+
session.query(MyClass).filter(MyClass.name == 'some name')
-
+
Multiple criteria are joined together by AND (new in 0.7.5)::
-
+
session.query(MyClass).\\
filter(MyClass.name == 'some name', MyClass.id > 5)
-
+
The criterion is any SQL expression object applicable to the
WHERE clause of a select. String expressions are coerced
into SQL expression constructs via the :func:`.text` construct.
See also:
-
+
:meth:`.Query.filter_by` - filter on keyword expressions.
"""
def filter_by(self, **kwargs):
"""apply the given filtering criterion to a copy
of this :class:`.Query`, using keyword expressions.
-
+
e.g.::
-
+
session.query(MyClass).filter_by(name = 'some name')
-
+
Multiple criteria are joined together by AND::
-
+
session.query(MyClass).\\
filter_by(name = 'some name', id = 5)
-
+
The keyword expressions are extracted from the primary
entity of the query, or the last entity that was the
target of a call to :meth:`.Query.join`.
-
+
See also:
-
+
:meth:`.Query.filter` - filter on SQL expressions.
-
+
"""
clauses = [_entity_descriptor(self._joinpoint_zero(), key) == value
def join(self, *props, **kwargs):
"""Create a SQL JOIN against this :class:`.Query` object's criterion
and apply generatively, returning the newly resulting :class:`.Query`.
-
+
**Simple Relationship Joins**
-
+
Consider a mapping between two classes ``User`` and ``Address``,
with a relationship ``User.addresses`` representing a collection
of ``Address`` objects associated with each ``User``. The most common
usage of :meth:`~.Query.join` is to create a JOIN along this
relationship, using the ``User.addresses`` attribute as an indicator
for how this should occur::
-
+
q = session.query(User).join(User.addresses)
-
+
Where above, the call to :meth:`~.Query.join` along ``User.addresses``
will result in SQL equivalent to::
-
+
SELECT user.* FROM user JOIN address ON user.id = address.user_id
-
+
In the above example we refer to ``User.addresses`` as passed to
:meth:`~.Query.join` as the *on clause*, that is, it indicates
how the "ON" portion of the JOIN should be constructed. For a
single-entity query such as the one above (i.e. we start by selecting only from
``User`` and nothing else), the relationship can also be specified by its
string name::
-
+
q = session.query(User).join("addresses")
-
+
:meth:`~.Query.join` can also accommodate multiple
"on clause" arguments to produce a chain of joins, such as below
where a join across four related entities is constructed::
-
+
q = session.query(User).join("orders", "items", "keywords")
-
+
The above would be shorthand for three separate calls to :meth:`~.Query.join`,
each using an explicit attribute to indicate the source entity::
-
+
q = session.query(User).\\
join(User.orders).\\
join(Order.items).\\
join(Item.keywords)
-
+
**Joins to a Target Entity or Selectable**
-
+
A second form of :meth:`~.Query.join` allows any mapped entity
or core selectable construct as a target. In this usage,
:meth:`~.Query.join` will attempt
to create a JOIN along the natural foreign key relationship between
two entities::
-
+
q = session.query(User).join(Address)
-
+
The above calling form of :meth:`.join` will raise an error if
either there are no foreign keys between the two entities, or if
there are multiple foreign key linkages between them. In the
above calling form, :meth:`~.Query.join` is called upon to
create the "on clause" automatically for us. The target can
be any mapped entity or selectable, such as a :class:`.Table`::
-
+
q = session.query(User).join(addresses_table)
-
+
**Joins to a Target with an ON Clause**
-
+
The third calling form allows both the target entity as well
as the ON clause to be passed explicitly. Suppose for
example we wanted to join to ``Address`` twice, using
to it using the ``target, onclause`` form, so that the
alias can be specified explicitly as the target along with
the relationship to instruct how the ON clause should proceed::
-
+
a_alias = aliased(Address)
-
+
q = session.query(User).\\
join(User.addresses).\\
join(a_alias, User.addresses).\\
filter(Address.email_address=='ed@foo.com').\\
filter(a_alias.email_address=='ed@bar.com')
-
+
Where above, the generated SQL would be similar to::
-
+
SELECT user.* FROM user
JOIN address ON user.id = address.user_id
JOIN address AS address_1 ON user.id=address_1.user_id
WHERE address.email_address = :email_address_1
AND address_1.email_address = :email_address_2
-
+
The two-argument calling form of :meth:`~.Query.join`
also allows us to construct arbitrary joins with SQL-oriented
"on clause" expressions, not relying upon configured relationships
at all. Any SQL expression can be passed as the ON clause
when using the two-argument form, which should refer to the target
entity in some way as well as an applicable source entity::
-
+
q = session.query(User).join(Address, User.id==Address.user_id)
-
+
.. note::
-
+
In SQLAlchemy 0.6 and earlier, the two argument form of
:meth:`~.Query.join` requires the usage of a tuple::
-
+
query(User).join((Address, User.id==Address.user_id))
-
+
This calling form is accepted in 0.7 and further, though
is not necessary unless multiple join conditions are passed to
a single :meth:`~.Query.join` call, which itself is also not
generally necessary as it is now equivalent to multiple
calls (this wasn't always the case).
-
+
**Advanced Join Targeting and Adaption**
There is a lot of flexibility in what the "target" can be when using
q = session.query(User).\\
join(addresses_q, addresses_q.c.user_id==User.id)
-
+
:meth:`~.Query.join` also features the ability to *adapt* a
:meth:`~sqlalchemy.orm.relationship` -driven ON clause to the target selectable.
Below we construct a JOIN from ``User`` to a subquery against ``Address``, allowing
the relationship denoted by ``User.addresses`` to *adapt* itself
to the altered target::
-
+
address_subq = session.query(Address).\\
filter(Address.email_address == 'ed@foo.com').\\
subquery()
q = session.query(User).join(address_subq, User.addresses)
-
+
Producing SQL similar to::
-
+
SELECT user.* FROM user
JOIN (
SELECT address.id AS id,
FROM address
WHERE address.email_address = :email_address_1
) AS anon_1 ON user.id = anon_1.user_id
-
+
The above form allows one to fall back onto an explicit ON
clause at any time::
-
+
q = session.query(User).\\
join(address_subq, User.id==address_subq.c.user_id)
-
+
**Controlling what to Join From**
-
+
While :meth:`~.Query.join` exclusively deals with the "right"
side of the JOIN, we can also control the "left" side, in those
cases where it's needed, using :meth:`~.Query.select_from`.
make usage of ``User.addresses`` as our ON clause by instructing
the :class:`.Query` to select first from the ``User``
entity::
-
+
q = session.query(Address).select_from(User).\\
join(User.addresses).\\
filter(User.name == 'ed')
-
+
Which will produce SQL similar to::
-
+
SELECT address.* FROM user
JOIN address ON user.id=address.user_id
WHERE user.name = :name_1
-
+
**Constructing Aliases Anonymously**
-
+
:meth:`~.Query.join` can construct anonymous aliases
using the ``aliased=True`` flag. This feature is useful
when a query is being joined algorithmically, such as
when querying self-referentially to an arbitrary depth::
-
+
q = session.query(Node).\\
join("children", "children", aliased=True)
-
+
When ``aliased=True`` is used, the actual "alias" construct
is not explicitly available. To work with it, methods such as
:meth:`.Query.filter` will adapt the incoming entity to
the last join point::
-
+
q = session.query(Node).\\
join("children", "children", aliased=True).\\
filter(Node.name == 'grandchild 1')
-
+
When using automatic aliasing, the ``from_joinpoint=True``
argument can allow a multi-node join to be broken into
multiple calls to :meth:`~.Query.join`, so that
each path along the way can be further filtered::
-
+
q = session.query(Node).\\
join("children", aliased=True).\\
filter(Node.name='child 1').\\
join("children", aliased=True, from_joinpoint=True).\\
filter(Node.name == 'grandchild 1')
-
+
The filtering aliases above can then be reset back to the
original ``Node`` entity using :meth:`~.Query.reset_joinpoint`::
-
+
q = session.query(Node).\\
join("children", "children", aliased=True).\\
filter(Node.name == 'grandchild 1').\\
reset_joinpoint().\\
filter(Node.name == 'parent 1)
-
+
For an example of ``aliased=True``, see the distribution
example :ref:`examples_xmlpersistence` which illustrates
an XPath-like query system using algorithmic joins.
-
+
:param *props: A collection of one or more join conditions,
each consisting of a relationship-bound attribute or string
relationship name representing an "on clause", or a single
of True here will cause the join to be from the most recent
joined target, rather than starting back from the original
FROM clauses of the query.
-
+
See also:
-
+
:ref:`ormtutorial_joins` in the ORM tutorial.
:ref:`inheritance_toplevel` for details on how :meth:`~.Query.join`
is used for inheritance relationships.
-
+
:func:`.orm.join` - a standalone ORM-level join function,
used internally by :meth:`.Query.join`, which in previous
SQLAlchemy versions was the primary ORM-level joining interface.
-
+
"""
aliased, from_joinpoint = kwargs.pop('aliased', False),\
kwargs.pop('from_joinpoint', False)
def reset_joinpoint(self):
"""Return a new :class:`.Query`, where the "join point" has
been reset back to the base FROM entities of the query.
-
+
This method is usually used in conjunction with the
``aliased=True`` feature of the :meth:`~.Query.join`
method. See the example in :meth:`~.Query.join` for how
Mapped entities or plain :class:`~.Table` or other selectables
can be sent here which will form the default FROM clause.
-
+
See the example in :meth:`~.Query.join` for a typical
usage of :meth:`~.Query.select_from`.
SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL users.name AS users_name
FROM users
-
+
New in 0.7.7.
"""
this :class:`.Query` - if these do not correspond, unchecked errors will occur.
The 'load' argument is the same as that of :meth:`.Session.merge`.
-
+
For an example of how :meth:`~.Query.merge_result` is used, see
the source code for the example :ref:`examples_caching`, where
:meth:`~.Query.merge_result` is used to efficiently restore state
def count(self):
"""Return a count of rows this Query would return.
-
+
This generates the SQL for this Query as follows::
-
+
SELECT count(1) AS count_1 FROM (
SELECT <rest of query follows...>
) AS anon_1
Note the above scheme is newly refined in 0.7
(as of 0.7b3).
-
+
For fine grained control over specific columns
to count, to skip the usage of a subquery or
otherwise control of the FROM clause,
or to use other aggregate functions,
use :attr:`~sqlalchemy.sql.expression.func` expressions in conjunction
with :meth:`~.Session.query`, i.e.::
-
+
from sqlalchemy import func
-
+
# count User records, without
# using a subquery.
session.query(func.count(User.id))
-
+
# return count of user "id" grouped
# by "name"
session.query(func.count(User.id)).\\
group_by(User.name)
from sqlalchemy import distinct
-
+
# count distinct "name" values
session.query(func.count(distinct(User.name)))
-
+
"""
col = sql.func.count(sql.literal_column('*'))
return self.from_self(col).scalar()
invokes :meth:`.SessionEvents.after_bulk_delete`.
"""
- #TODO: lots of duplication and ifs - probably needs to be
- # refactored to strategies
#TODO: cascades need handling.
- if synchronize_session not in [False, 'evaluate', 'fetch']:
- raise sa_exc.ArgumentError(
- "Valid strategies for session "
- "synchronization are False, 'evaluate' and "
- "'fetch'")
- self._no_select_modifiers("delete")
-
- self = self.enable_eagerloads(False)
-
- context = self._compile_context()
- if len(context.statement.froms) != 1 or \
- not isinstance(context.statement.froms[0], schema.Table):
- raise sa_exc.ArgumentError("Only deletion via a single table "
- "query is currently supported")
- primary_table = context.statement.froms[0]
-
- session = self.session
-
- if self._autoflush:
- session._autoflush()
-
- if synchronize_session == 'evaluate':
- try:
- evaluator_compiler = evaluator.EvaluatorCompiler()
- if self.whereclause is not None:
- eval_condition = evaluator_compiler.process(
- self.whereclause)
- else:
- def eval_condition(obj):
- return True
-
- except evaluator.UnevaluatableError:
- raise sa_exc.InvalidRequestError(
- "Could not evaluate current criteria in Python. "
- "Specify 'fetch' or False for the synchronize_session "
- "parameter.")
-
- target_cls = self._mapper_zero().class_
-
- #TODO: detect when the where clause is a trivial primary key match
- objs_to_expunge = [
- obj for (cls, pk),obj in
- session.identity_map.iteritems()
- if issubclass(cls, target_cls) and
- eval_condition(obj)]
-
- elif synchronize_session == 'fetch':
- #TODO: use RETURNING when available
- select_stmt = context.statement.with_only_columns(
- primary_table.primary_key)
- matched_rows = session.execute(
- select_stmt,
- params=self._params).fetchall()
-
- delete_stmt = sql.delete(primary_table, context.whereclause)
-
- result = session.execute(delete_stmt, params=self._params)
-
- if synchronize_session == 'evaluate':
- session._remove_newly_deleted([attributes.instance_state(obj)
- for obj in objs_to_expunge])
- elif synchronize_session == 'fetch':
- target_mapper = self._mapper_zero()
- for primary_key in matched_rows:
- # TODO: inline this and call remove_newly_deleted
- # once
- identity_key = target_mapper.identity_key_from_primary_key(
- list(primary_key))
- if identity_key in session.identity_map:
- session._remove_newly_deleted(
- [attributes.instance_state(
- session.identity_map[identity_key]
- )]
- )
-
- session.dispatch.after_bulk_delete(session, self, context, result)
-
- return result.rowcount
+ delete_op = persistence.BulkDelete.factory(self, synchronize_session)
+ delete_op.exec_()
+ return delete_op.rowcount
def update(self, values, synchronize_session='evaluate'):
"""Perform a bulk update query.
# fk assignments
#TODO: cascades need handling.
- if synchronize_session == 'expire':
- util.warn_deprecated("The 'expire' value as applied to "
- "the synchronize_session argument of "
- "query.update() is now called 'fetch'")
- synchronize_session = 'fetch'
-
- if synchronize_session not in [False, 'evaluate', 'fetch']:
- raise sa_exc.ArgumentError(
- "Valid strategies for session synchronization "
- "are False, 'evaluate' and 'fetch'")
- self._no_select_modifiers("update")
-
- self = self.enable_eagerloads(False)
-
- context = self._compile_context()
- if len(context.statement.froms) != 1 or \
- not isinstance(context.statement.froms[0], schema.Table):
- raise sa_exc.ArgumentError(
- "Only update via a single table query is "
- "currently supported")
- primary_table = context.statement.froms[0]
-
- session = self.session
-
- if self._autoflush:
- session._autoflush()
-
- if synchronize_session == 'evaluate':
- try:
- evaluator_compiler = evaluator.EvaluatorCompiler()
- if self.whereclause is not None:
- eval_condition = evaluator_compiler.process(
- self.whereclause)
- else:
- def eval_condition(obj):
- return True
-
- value_evaluators = {}
- for key,value in values.iteritems():
- key = _attr_as_key(key)
- value_evaluators[key] = evaluator_compiler.process(
- expression._literal_as_binds(value))
- except evaluator.UnevaluatableError:
- raise sa_exc.InvalidRequestError(
- "Could not evaluate current criteria in Python. "
- "Specify 'fetch' or False for the "
- "synchronize_session parameter.")
- target_cls = self._mapper_zero().class_
- matched_objects = []
- for (cls, pk),obj in session.identity_map.iteritems():
- evaluated_keys = value_evaluators.keys()
-
- if issubclass(cls, target_cls) and eval_condition(obj):
- matched_objects.append(obj)
-
- elif synchronize_session == 'fetch':
- select_stmt = context.statement.with_only_columns(
- primary_table.primary_key)
- matched_rows = session.execute(
- select_stmt,
- params=self._params).fetchall()
-
- update_stmt = sql.update(primary_table, context.whereclause, values)
-
- result = session.execute(update_stmt, params=self._params)
-
- if synchronize_session == 'evaluate':
- target_cls = self._mapper_zero().class_
- states = set()
- for obj in matched_objects:
- state, dict_ = attributes.instance_state(obj),\
- attributes.instance_dict(obj)
-
- # only evaluate unmodified attributes
- to_evaluate = state.unmodified.intersection(
- evaluated_keys)
- for key in to_evaluate:
- dict_[key] = value_evaluators[key](obj)
-
- state.commit(dict_, list(to_evaluate))
-
- # expire attributes with pending changes
- # (there was no autoflush, so they are overwritten)
- state.expire_attributes(dict_,
- set(evaluated_keys).
- difference(to_evaluate))
- states.add(state)
- session._register_altered(states)
-
- elif synchronize_session == 'fetch':
- target_mapper = self._mapper_zero()
-
- states = set([
- attributes.instance_state(session.identity_map[identity_key])
- for identity_key in [
- target_mapper.identity_key_from_primary_key(
- list(primary_key))
- for primary_key in matched_rows
- ]
- ])
- attrib = [_attr_as_key(k) for k in values]
- for state in states:
- session._expire_state(state, attrib)
- session._register_altered(states)
-
- session.dispatch.after_bulk_update(session, self, context, result)
+ update_op = persistence.BulkUpdate.factory(self, synchronize_session, values)
+ update_op.exec_()
+ return update_op.rowcount
- return result.rowcount
def _compile_context(self, labels=True):
context = QueryContext(self)