From: Mike Bayer Date: Sun, 29 Apr 2012 22:53:29 +0000 (-0400) Subject: - refactor query.update() and query.delete() to use a pure X-Git-Tag: rel_0_8_0b1~437 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=f10c2f3dc38c627c108c88f1190dd4e9c4512a94;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - refactor query.update() and query.delete() to use a pure template method pattern using new class hierarchy BulkUD in sqlalchemy.orm.persistence --- diff --git a/lib/sqlalchemy/orm/persistence.py b/lib/sqlalchemy/orm/persistence.py index 55b9bf84a1..6e455d9c76 100644 --- a/lib/sqlalchemy/orm/persistence.py +++ b/lib/sqlalchemy/orm/persistence.py @@ -16,11 +16,13 @@ in unitofwork.py. import operator from itertools import groupby -from sqlalchemy import sql, util, exc as sa_exc +from sqlalchemy import sql, util, exc as sa_exc, schema from sqlalchemy.orm import attributes, sync, \ - exc as orm_exc + exc as orm_exc,\ + evaluator -from sqlalchemy.orm.util import _state_mapper, state_str +from sqlalchemy.orm.util import _state_mapper, state_str, _attr_as_key +from sqlalchemy.sql import expression def save_obj(base_mapper, states, uowtransaction, single=False): """Issue ``INSERT`` and/or ``UPDATE`` statements for a list @@ -121,13 +123,13 @@ def delete_obj(base_mapper, states, uowtransaction): def _organize_states_for_save(base_mapper, states, uowtransaction): """Make an initial pass across a set of states for INSERT or UPDATE. - + This includes splitting out into distinct lists for each, calling before_insert/before_update, obtaining key information for each state including its dictionary, mapper, the connection to use for the execution per state, and the identity flag. - + """ states_to_insert = [] @@ -191,22 +193,22 @@ def _organize_states_for_post_update(base_mapper, states, uowtransaction): """Make an initial pass across a set of states for UPDATE corresponding to post_update. - + This includes obtaining key information for each state including its dictionary, mapper, the connection to use for the execution per state. - + """ return list(_connections_for_states(base_mapper, uowtransaction, states)) def _organize_states_for_delete(base_mapper, states, uowtransaction): """Make an initial pass across a set of states for DELETE. - + This includes calling out before_delete and obtaining key information for each state including its dictionary, mapper, the connection to use for the execution per state. - + """ states_to_delete = [] @@ -224,7 +226,7 @@ def _collect_insert_commands(base_mapper, uowtransaction, table, states_to_insert): """Identify sets of values to use in INSERT statements for a list of states. - + """ insert = [] for state, state_dict, mapper, connection, has_identity, \ @@ -267,7 +269,7 @@ def _collect_update_commands(base_mapper, uowtransaction, table, states_to_update): """Identify sets of values to use in UPDATE statements for a list of states. - + This function works intricately with the history system to determine exactly what values should be updated as well as how the row should be matched within an UPDATE @@ -675,7 +677,7 @@ def _finalize_insert_update_commands(base_mapper, uowtransaction, states_to_insert, states_to_update): """finalize state on states that have been inserted or updated, including calling after_insert/after_update events. - + """ for state, state_dict, mapper, connection, has_identity, \ instance_key, row_switch in states_to_insert + \ @@ -735,11 +737,11 @@ def _postfetch(mapper, uowtransaction, table, def _connections_for_states(base_mapper, uowtransaction, states): """Return an iterator of (state, state.dict, mapper, connection). - + The states are sorted according to _sort_states, then paired with the connection they should be using for the given unit of work transaction. - + """ # if session has a connection callable, # organize individual states with the connection @@ -774,4 +776,235 @@ def _sort_states(states): return sorted(pending, key=operator.attrgetter("insert_order")) + \ sorted(persistent, key=lambda q:q.key[1]) +class BulkUD(object): + """Handle bulk update and deletes via a :class:`.Query`.""" + + def __init__(self, query): + self.query = query.enable_eagerloads(False) + + @classmethod + def _factory(cls, lookup, synchronize_session, *arg): + try: + klass = lookup[synchronize_session] + except KeyError: + raise sa_exc.ArgumentError( + "Valid strategies for session synchronization " + "are %s" % (", ".join(sorted(repr(x) + for x in lookup.keys())))) + else: + return klass(*arg) + + def exec_(self): + self._do_pre() + self._do_pre_synchronize() + self._do_exec() + self._do_post_synchronize() + self._do_post() + + def _do_pre(self): + query = self.query + self.context = context = query._compile_context() + if len(context.statement.froms) != 1 or \ + not isinstance(context.statement.froms[0], schema.Table): + raise sa_exc.ArgumentError( + "Only update via a single table query is " + "currently supported") + self.primary_table = context.statement.froms[0] + + session = query.session + + if query._autoflush: + session._autoflush() + + def _do_pre_synchronize(self): + pass + + def _do_post_synchronize(self): + pass + +class BulkEvaluate(BulkUD): + """BulkUD which does the 'evaluate' method of session state resolution.""" + + def _additional_evaluators(self, evaluator_compiler): + pass + + def _do_pre_synchronize(self): + query = self.query + try: + evaluator_compiler = evaluator.EvaluatorCompiler() + if query.whereclause is not None: + eval_condition = evaluator_compiler.process( + query.whereclause) + else: + def eval_condition(obj): + return True + + self._additional_evaluators(evaluator_compiler) + + except evaluator.UnevaluatableError: + raise sa_exc.InvalidRequestError( + "Could not evaluate current criteria in Python. " + "Specify 'fetch' or False for the " + "synchronize_session parameter.") + target_cls = query._mapper_zero().class_ + + #TODO: detect when the where clause is a trivial primary key match + self.matched_objects = [ + obj for (cls, pk),obj in + query.session.identity_map.iteritems() + if issubclass(cls, target_cls) and + eval_condition(obj)] + +class BulkFetch(BulkUD): + """BulkUD which does the 'fetch' method of session state resolution.""" + + def _do_pre_synchronize(self): + query = self.query + session = query.session + select_stmt = self.context.statement.with_only_columns( + self.primary_table.primary_key) + self.matched_rows = session.execute( + select_stmt, + params=query._params).fetchall() + +class BulkUpdate(BulkUD): + """BulkUD which handles UPDATEs.""" + + def __init__(self, query, values): + super(BulkUpdate, self).__init__(query) + self.query._no_select_modifiers("update") + self.values = values + + @classmethod + def factory(cls, query, synchronize_session, values): + return BulkUD._factory({ + "evaluate":BulkUpdateEvaluate, + "fetch":BulkUpdateFetch, + False:BulkUpdate + }, synchronize_session, query, values) + + def _do_exec(self): + update_stmt = sql.update(self.primary_table, + self.context.whereclause, self.values) + + self.result = self.query.session.execute( + update_stmt, params=self.query._params) + self.rowcount = self.result.rowcount + + def _do_post(self): + session = self.query.session + session.dispatch.after_bulk_update(session, self.query, + self.context, self.result) + +class BulkDelete(BulkUD): + """BulkUD which handles DELETEs.""" + + def __init__(self, query): + super(BulkDelete, self).__init__(query) + self.query._no_select_modifiers("delete") + + @classmethod + def factory(cls, query, synchronize_session): + return BulkUD._factory({ + "evaluate":BulkDeleteEvaluate, + "fetch":BulkDeleteFetch, + False:BulkDelete + }, synchronize_session, query) + + def _do_exec(self): + delete_stmt = sql.delete(self.primary_table, + self.context.whereclause) + + self.result = self.query.session.execute(delete_stmt, + params=self.query._params) + self.rowcount = self.result.rowcount + + def _do_post(self): + session = self.query.session + session.dispatch.after_bulk_delete(session, self.query, + self.context, self.result) + +class BulkUpdateEvaluate(BulkEvaluate, BulkUpdate): + """BulkUD which handles UPDATEs using the "evaluate" + method of session resolution.""" + + def _additional_evaluators(self,evaluator_compiler): + self.value_evaluators = {} + for key,value in self.values.iteritems(): + key = _attr_as_key(key) + self.value_evaluators[key] = evaluator_compiler.process( + expression._literal_as_binds(value)) + + def _do_post_synchronize(self): + session = self.query.session + states = set() + evaluated_keys = self.value_evaluators.keys() + for obj in self.matched_objects: + state, dict_ = attributes.instance_state(obj),\ + attributes.instance_dict(obj) + + # only evaluate unmodified attributes + to_evaluate = state.unmodified.intersection( + evaluated_keys) + for key in to_evaluate: + dict_[key] = self.value_evaluators[key](obj) + + state.commit(dict_, list(to_evaluate)) + + # expire attributes with pending changes + # (there was no autoflush, so they are overwritten) + state.expire_attributes(dict_, + set(evaluated_keys). + difference(to_evaluate)) + states.add(state) + session._register_altered(states) + +class BulkDeleteEvaluate(BulkEvaluate, BulkDelete): + """BulkUD which handles DELETEs using the "evaluate" + method of session resolution.""" + + def _do_post_synchronize(self): + self.query.session._remove_newly_deleted( + [attributes.instance_state(obj) + for obj in self.matched_objects]) + +class BulkUpdateFetch(BulkFetch, BulkUpdate): + """BulkUD which handles UPDATEs using the "fetch" + method of session resolution.""" + + def _do_post_synchronize(self): + session = self.query.session + target_mapper = self.query._mapper_zero() + + states = set([ + attributes.instance_state(session.identity_map[identity_key]) + for identity_key in [ + target_mapper.identity_key_from_primary_key( + list(primary_key)) + for primary_key in self.matched_rows + ] + ]) + attrib = [_attr_as_key(k) for k in self.values] + for state in states: + session._expire_state(state, attrib) + session._register_altered(states) + +class BulkDeleteFetch(BulkFetch, BulkDelete): + """BulkUD which handles DELETEs using the "fetch" + method of session resolution.""" + + def _do_post_synchronize(self): + session = self.query.session + target_mapper = self.query._mapper_zero() + for primary_key in self.matched_rows: + # TODO: inline this and call remove_newly_deleted + # once + identity_key = target_mapper.identity_key_from_primary_key( + list(primary_key)) + if identity_key in session.identity_map: + session._remove_newly_deleted( + [attributes.instance_state( + session.identity_map[identity_key] + )] + ) diff --git a/lib/sqlalchemy/orm/query.py b/lib/sqlalchemy/orm/query.py index 5cf9ea5cfe..77d3cd2d33 100644 --- a/lib/sqlalchemy/orm/query.py +++ b/lib/sqlalchemy/orm/query.py @@ -24,6 +24,7 @@ from operator import itemgetter from sqlalchemy import sql, util, log, schema from sqlalchemy import exc as sa_exc from sqlalchemy.orm import exc as orm_exc +from sqlalchemy.orm import persistence from sqlalchemy.sql import util as sql_util from sqlalchemy.sql import expression, visitors, operators from sqlalchemy.orm import ( @@ -450,11 +451,11 @@ class Query(object): represented as a common table expression (CTE). The :meth:`.Query.cte` method is new in 0.7.6. - + Parameters and usage are the same as those of the :meth:`._SelectBase.cte` method; see that method for further details. - + Here is the `Postgresql WITH RECURSIVE example `_. Note that, in this example, the ``included_parts`` cte and the ``incl_alias`` alias @@ -495,7 +496,7 @@ class Query(object): group_by(included_parts.c.sub_part) See also: - + :meth:`._SelectBase.cte` """ @@ -614,7 +615,7 @@ class Query(object): selectable=None, polymorphic_on=None): """Load columns for inheriting classes. - + :meth:`.Query.with_polymorphic` applies transformations to the "main" mapped class represented by this :class:`.Query`. The "main" mapped class here means the :class:`.Query` @@ -624,10 +625,10 @@ class Query(object): subclass are available in the query, both for the purposes of load-time efficiency as well as the ability to use these columns at query time. - + See the documentation section :ref:`with_polymorphic` for details on how this method is used. - + As of 0.8, a new and more flexible function :func:`.orm.with_polymorphic` supersedes :meth:`.Query.with_polymorphic`, as it can apply the equivalent @@ -677,13 +678,13 @@ class Query(object): def get(self, ident): """Return an instance based on the given primary key identifier, or ``None`` if not found. - + E.g.:: - + my_user = session.query(User).get(5) - + some_object = session.query(VersionedFoo).get((5, 10)) - + :meth:`~.Query.get` is special in that it provides direct access to the identity map of the owning :class:`.Session`. If the given primary key identifier is present @@ -692,14 +693,14 @@ class Query(object): unless the object has been marked fully expired. If not present, a SELECT is performed in order to locate the object. - + :meth:`~.Query.get` also will perform a check if the object is present in the identity map and marked as expired - a SELECT is emitted to refresh the object as well as to ensure that the row is still present. If not, :class:`~sqlalchemy.orm.exc.ObjectDeletedError` is raised. - + :meth:`~.Query.get` is only used to return a single mapped instance, not multiple instances or individual column constructs, and strictly @@ -710,7 +711,7 @@ class Query(object): options via :meth:`~.Query.options` may be applied however, and will be used if the object is not yet locally present. - + A lazy-loading, many-to-one attribute configured by :func:`.relationship`, using a simple foreign-key-to-primary-key criterion, will also use an @@ -718,7 +719,7 @@ class Query(object): the target value from the local identity map before querying the database. See :ref:`loading_toplevel` for further details on relationship loading. - + :param ident: A scalar or tuple value representing the primary key. For a composite primary key, the order of identifiers corresponds in most cases @@ -729,7 +730,7 @@ class Query(object): to the elements present in this collection. :return: The object instance, or ``None``. - + """ # convert composite types to individual args @@ -818,7 +819,7 @@ class Query(object): """Set the 'invoke all eagers' flag which causes joined- and subquery loaders to traverse into already-loaded related objects and collections. - + Default is that of :attr:`.Query._invoke_all_eagers`. """ @@ -1033,16 +1034,16 @@ class Query(object): def with_transformation(self, fn): """Return a new :class:`.Query` object transformed by the given function. - + E.g.:: - + def filter_something(criterion): def transform(q): return q.filter(criterion) return transform - + q = q.with_transformation(filter_something(x==5)) - + This allows ad-hoc recipes to be created for :class:`.Query` objects. See the example at :ref:`hybrid_transformers`. @@ -1102,7 +1103,7 @@ class Query(object): ``'read_nowait'`` - passes ``for_update='read_nowait'``, which translates to ``FOR SHARE NOWAIT`` (supported by PostgreSQL). - + New in 0.7.7: ``FOR SHARE`` and ``FOR SHARE NOWAIT`` (PostgreSQL) """ @@ -1134,20 +1135,20 @@ class Query(object): of this :class:`.Query`, using SQL expressions. e.g.:: - + session.query(MyClass).filter(MyClass.name == 'some name') - + Multiple criteria are joined together by AND (new in 0.7.5):: - + session.query(MyClass).\\ filter(MyClass.name == 'some name', MyClass.id > 5) - + The criterion is any SQL expression object applicable to the WHERE clause of a select. String expressions are coerced into SQL expression constructs via the :func:`.text` construct. See also: - + :meth:`.Query.filter_by` - filter on keyword expressions. """ @@ -1171,24 +1172,24 @@ class Query(object): def filter_by(self, **kwargs): """apply the given filtering criterion to a copy of this :class:`.Query`, using keyword expressions. - + e.g.:: - + session.query(MyClass).filter_by(name = 'some name') - + Multiple criteria are joined together by AND:: - + session.query(MyClass).\\ filter_by(name = 'some name', id = 5) - + The keyword expressions are extracted from the primary entity of the query, or the last entity that was the target of a call to :meth:`.Query.join`. - + See also: - + :meth:`.Query.filter` - filter on SQL expressions. - + """ clauses = [_entity_descriptor(self._joinpoint_zero(), key) == value @@ -1362,67 +1363,67 @@ class Query(object): def join(self, *props, **kwargs): """Create a SQL JOIN against this :class:`.Query` object's criterion and apply generatively, returning the newly resulting :class:`.Query`. - + **Simple Relationship Joins** - + Consider a mapping between two classes ``User`` and ``Address``, with a relationship ``User.addresses`` representing a collection of ``Address`` objects associated with each ``User``. The most common usage of :meth:`~.Query.join` is to create a JOIN along this relationship, using the ``User.addresses`` attribute as an indicator for how this should occur:: - + q = session.query(User).join(User.addresses) - + Where above, the call to :meth:`~.Query.join` along ``User.addresses`` will result in SQL equivalent to:: - + SELECT user.* FROM user JOIN address ON user.id = address.user_id - + In the above example we refer to ``User.addresses`` as passed to :meth:`~.Query.join` as the *on clause*, that is, it indicates how the "ON" portion of the JOIN should be constructed. For a single-entity query such as the one above (i.e. we start by selecting only from ``User`` and nothing else), the relationship can also be specified by its string name:: - + q = session.query(User).join("addresses") - + :meth:`~.Query.join` can also accommodate multiple "on clause" arguments to produce a chain of joins, such as below where a join across four related entities is constructed:: - + q = session.query(User).join("orders", "items", "keywords") - + The above would be shorthand for three separate calls to :meth:`~.Query.join`, each using an explicit attribute to indicate the source entity:: - + q = session.query(User).\\ join(User.orders).\\ join(Order.items).\\ join(Item.keywords) - + **Joins to a Target Entity or Selectable** - + A second form of :meth:`~.Query.join` allows any mapped entity or core selectable construct as a target. In this usage, :meth:`~.Query.join` will attempt to create a JOIN along the natural foreign key relationship between two entities:: - + q = session.query(User).join(Address) - + The above calling form of :meth:`.join` will raise an error if either there are no foreign keys between the two entities, or if there are multiple foreign key linkages between them. In the above calling form, :meth:`~.Query.join` is called upon to create the "on clause" automatically for us. The target can be any mapped entity or selectable, such as a :class:`.Table`:: - + q = session.query(User).join(addresses_table) - + **Joins to a Target with an ON Clause** - + The third calling form allows both the target entity as well as the ON clause to be passed explicitly. Suppose for example we wanted to join to ``Address`` twice, using @@ -1431,45 +1432,45 @@ class Query(object): to it using the ``target, onclause`` form, so that the alias can be specified explicitly as the target along with the relationship to instruct how the ON clause should proceed:: - + a_alias = aliased(Address) - + q = session.query(User).\\ join(User.addresses).\\ join(a_alias, User.addresses).\\ filter(Address.email_address=='ed@foo.com').\\ filter(a_alias.email_address=='ed@bar.com') - + Where above, the generated SQL would be similar to:: - + SELECT user.* FROM user JOIN address ON user.id = address.user_id JOIN address AS address_1 ON user.id=address_1.user_id WHERE address.email_address = :email_address_1 AND address_1.email_address = :email_address_2 - + The two-argument calling form of :meth:`~.Query.join` also allows us to construct arbitrary joins with SQL-oriented "on clause" expressions, not relying upon configured relationships at all. Any SQL expression can be passed as the ON clause when using the two-argument form, which should refer to the target entity in some way as well as an applicable source entity:: - + q = session.query(User).join(Address, User.id==Address.user_id) - + .. note:: - + In SQLAlchemy 0.6 and earlier, the two argument form of :meth:`~.Query.join` requires the usage of a tuple:: - + query(User).join((Address, User.id==Address.user_id)) - + This calling form is accepted in 0.7 and further, though is not necessary unless multiple join conditions are passed to a single :meth:`~.Query.join` call, which itself is also not generally necessary as it is now equivalent to multiple calls (this wasn't always the case). - + **Advanced Join Targeting and Adaption** There is a lot of flexibility in what the "target" can be when using @@ -1483,21 +1484,21 @@ class Query(object): q = session.query(User).\\ join(addresses_q, addresses_q.c.user_id==User.id) - + :meth:`~.Query.join` also features the ability to *adapt* a :meth:`~sqlalchemy.orm.relationship` -driven ON clause to the target selectable. Below we construct a JOIN from ``User`` to a subquery against ``Address``, allowing the relationship denoted by ``User.addresses`` to *adapt* itself to the altered target:: - + address_subq = session.query(Address).\\ filter(Address.email_address == 'ed@foo.com').\\ subquery() q = session.query(User).join(address_subq, User.addresses) - + Producing SQL similar to:: - + SELECT user.* FROM user JOIN ( SELECT address.id AS id, @@ -1506,15 +1507,15 @@ class Query(object): FROM address WHERE address.email_address = :email_address_1 ) AS anon_1 ON user.id = anon_1.user_id - + The above form allows one to fall back onto an explicit ON clause at any time:: - + q = session.query(User).\\ join(address_subq, User.id==address_subq.c.user_id) - + **Controlling what to Join From** - + While :meth:`~.Query.join` exclusively deals with the "right" side of the JOIN, we can also control the "left" side, in those cases where it's needed, using :meth:`~.Query.select_from`. @@ -1522,60 +1523,60 @@ class Query(object): make usage of ``User.addresses`` as our ON clause by instructing the :class:`.Query` to select first from the ``User`` entity:: - + q = session.query(Address).select_from(User).\\ join(User.addresses).\\ filter(User.name == 'ed') - + Which will produce SQL similar to:: - + SELECT address.* FROM user JOIN address ON user.id=address.user_id WHERE user.name = :name_1 - + **Constructing Aliases Anonymously** - + :meth:`~.Query.join` can construct anonymous aliases using the ``aliased=True`` flag. This feature is useful when a query is being joined algorithmically, such as when querying self-referentially to an arbitrary depth:: - + q = session.query(Node).\\ join("children", "children", aliased=True) - + When ``aliased=True`` is used, the actual "alias" construct is not explicitly available. To work with it, methods such as :meth:`.Query.filter` will adapt the incoming entity to the last join point:: - + q = session.query(Node).\\ join("children", "children", aliased=True).\\ filter(Node.name == 'grandchild 1') - + When using automatic aliasing, the ``from_joinpoint=True`` argument can allow a multi-node join to be broken into multiple calls to :meth:`~.Query.join`, so that each path along the way can be further filtered:: - + q = session.query(Node).\\ join("children", aliased=True).\\ filter(Node.name='child 1').\\ join("children", aliased=True, from_joinpoint=True).\\ filter(Node.name == 'grandchild 1') - + The filtering aliases above can then be reset back to the original ``Node`` entity using :meth:`~.Query.reset_joinpoint`:: - + q = session.query(Node).\\ join("children", "children", aliased=True).\\ filter(Node.name == 'grandchild 1').\\ reset_joinpoint().\\ filter(Node.name == 'parent 1) - + For an example of ``aliased=True``, see the distribution example :ref:`examples_xmlpersistence` which illustrates an XPath-like query system using algorithmic joins. - + :param *props: A collection of one or more join conditions, each consisting of a relationship-bound attribute or string relationship name representing an "on clause", or a single @@ -1590,18 +1591,18 @@ class Query(object): of True here will cause the join to be from the most recent joined target, rather than starting back from the original FROM clauses of the query. - + See also: - + :ref:`ormtutorial_joins` in the ORM tutorial. :ref:`inheritance_toplevel` for details on how :meth:`~.Query.join` is used for inheritance relationships. - + :func:`.orm.join` - a standalone ORM-level join function, used internally by :meth:`.Query.join`, which in previous SQLAlchemy versions was the primary ORM-level joining interface. - + """ aliased, from_joinpoint = kwargs.pop('aliased', False),\ kwargs.pop('from_joinpoint', False) @@ -1936,7 +1937,7 @@ class Query(object): def reset_joinpoint(self): """Return a new :class:`.Query`, where the "join point" has been reset back to the base FROM entities of the query. - + This method is usually used in conjunction with the ``aliased=True`` feature of the :meth:`~.Query.join` method. See the example in :meth:`~.Query.join` for how @@ -1957,7 +1958,7 @@ class Query(object): Mapped entities or plain :class:`~.Table` or other selectables can be sent here which will form the default FROM clause. - + See the example in :meth:`~.Query.join` for a typical usage of :meth:`~.Query.select_from`. @@ -2072,7 +2073,7 @@ class Query(object): SELECT HIGH_PRIORITY SQL_SMALL_RESULT ALL users.name AS users_name FROM users - + New in 0.7.7. """ @@ -2358,7 +2359,7 @@ class Query(object): this :class:`.Query` - if these do not correspond, unchecked errors will occur. The 'load' argument is the same as that of :meth:`.Session.merge`. - + For an example of how :meth:`~.Query.merge_result` is used, see the source code for the example :ref:`examples_caching`, where :meth:`~.Query.merge_result` is used to efficiently restore state @@ -2506,39 +2507,39 @@ class Query(object): def count(self): """Return a count of rows this Query would return. - + This generates the SQL for this Query as follows:: - + SELECT count(1) AS count_1 FROM ( SELECT ) AS anon_1 Note the above scheme is newly refined in 0.7 (as of 0.7b3). - + For fine grained control over specific columns to count, to skip the usage of a subquery or otherwise control of the FROM clause, or to use other aggregate functions, use :attr:`~sqlalchemy.sql.expression.func` expressions in conjunction with :meth:`~.Session.query`, i.e.:: - + from sqlalchemy import func - + # count User records, without # using a subquery. session.query(func.count(User.id)) - + # return count of user "id" grouped # by "name" session.query(func.count(User.id)).\\ group_by(User.name) from sqlalchemy import distinct - + # count distinct "name" values session.query(func.count(distinct(User.name))) - + """ col = sql.func.count(sql.literal_column('*')) return self.from_self(col).scalar() @@ -2587,88 +2588,11 @@ class Query(object): invokes :meth:`.SessionEvents.after_bulk_delete`. """ - #TODO: lots of duplication and ifs - probably needs to be - # refactored to strategies #TODO: cascades need handling. - if synchronize_session not in [False, 'evaluate', 'fetch']: - raise sa_exc.ArgumentError( - "Valid strategies for session " - "synchronization are False, 'evaluate' and " - "'fetch'") - self._no_select_modifiers("delete") - - self = self.enable_eagerloads(False) - - context = self._compile_context() - if len(context.statement.froms) != 1 or \ - not isinstance(context.statement.froms[0], schema.Table): - raise sa_exc.ArgumentError("Only deletion via a single table " - "query is currently supported") - primary_table = context.statement.froms[0] - - session = self.session - - if self._autoflush: - session._autoflush() - - if synchronize_session == 'evaluate': - try: - evaluator_compiler = evaluator.EvaluatorCompiler() - if self.whereclause is not None: - eval_condition = evaluator_compiler.process( - self.whereclause) - else: - def eval_condition(obj): - return True - - except evaluator.UnevaluatableError: - raise sa_exc.InvalidRequestError( - "Could not evaluate current criteria in Python. " - "Specify 'fetch' or False for the synchronize_session " - "parameter.") - - target_cls = self._mapper_zero().class_ - - #TODO: detect when the where clause is a trivial primary key match - objs_to_expunge = [ - obj for (cls, pk),obj in - session.identity_map.iteritems() - if issubclass(cls, target_cls) and - eval_condition(obj)] - - elif synchronize_session == 'fetch': - #TODO: use RETURNING when available - select_stmt = context.statement.with_only_columns( - primary_table.primary_key) - matched_rows = session.execute( - select_stmt, - params=self._params).fetchall() - - delete_stmt = sql.delete(primary_table, context.whereclause) - - result = session.execute(delete_stmt, params=self._params) - - if synchronize_session == 'evaluate': - session._remove_newly_deleted([attributes.instance_state(obj) - for obj in objs_to_expunge]) - elif synchronize_session == 'fetch': - target_mapper = self._mapper_zero() - for primary_key in matched_rows: - # TODO: inline this and call remove_newly_deleted - # once - identity_key = target_mapper.identity_key_from_primary_key( - list(primary_key)) - if identity_key in session.identity_map: - session._remove_newly_deleted( - [attributes.instance_state( - session.identity_map[identity_key] - )] - ) - - session.dispatch.after_bulk_delete(session, self, context, result) - - return result.rowcount + delete_op = persistence.BulkDelete.factory(self, synchronize_session) + delete_op.exec_() + return delete_op.rowcount def update(self, values, synchronize_session='evaluate'): """Perform a bulk update query. @@ -2722,114 +2646,10 @@ class Query(object): # fk assignments #TODO: cascades need handling. - if synchronize_session == 'expire': - util.warn_deprecated("The 'expire' value as applied to " - "the synchronize_session argument of " - "query.update() is now called 'fetch'") - synchronize_session = 'fetch' - - if synchronize_session not in [False, 'evaluate', 'fetch']: - raise sa_exc.ArgumentError( - "Valid strategies for session synchronization " - "are False, 'evaluate' and 'fetch'") - self._no_select_modifiers("update") - - self = self.enable_eagerloads(False) - - context = self._compile_context() - if len(context.statement.froms) != 1 or \ - not isinstance(context.statement.froms[0], schema.Table): - raise sa_exc.ArgumentError( - "Only update via a single table query is " - "currently supported") - primary_table = context.statement.froms[0] - - session = self.session - - if self._autoflush: - session._autoflush() - - if synchronize_session == 'evaluate': - try: - evaluator_compiler = evaluator.EvaluatorCompiler() - if self.whereclause is not None: - eval_condition = evaluator_compiler.process( - self.whereclause) - else: - def eval_condition(obj): - return True - - value_evaluators = {} - for key,value in values.iteritems(): - key = _attr_as_key(key) - value_evaluators[key] = evaluator_compiler.process( - expression._literal_as_binds(value)) - except evaluator.UnevaluatableError: - raise sa_exc.InvalidRequestError( - "Could not evaluate current criteria in Python. " - "Specify 'fetch' or False for the " - "synchronize_session parameter.") - target_cls = self._mapper_zero().class_ - matched_objects = [] - for (cls, pk),obj in session.identity_map.iteritems(): - evaluated_keys = value_evaluators.keys() - - if issubclass(cls, target_cls) and eval_condition(obj): - matched_objects.append(obj) - - elif synchronize_session == 'fetch': - select_stmt = context.statement.with_only_columns( - primary_table.primary_key) - matched_rows = session.execute( - select_stmt, - params=self._params).fetchall() - - update_stmt = sql.update(primary_table, context.whereclause, values) - - result = session.execute(update_stmt, params=self._params) - - if synchronize_session == 'evaluate': - target_cls = self._mapper_zero().class_ - states = set() - for obj in matched_objects: - state, dict_ = attributes.instance_state(obj),\ - attributes.instance_dict(obj) - - # only evaluate unmodified attributes - to_evaluate = state.unmodified.intersection( - evaluated_keys) - for key in to_evaluate: - dict_[key] = value_evaluators[key](obj) - - state.commit(dict_, list(to_evaluate)) - - # expire attributes with pending changes - # (there was no autoflush, so they are overwritten) - state.expire_attributes(dict_, - set(evaluated_keys). - difference(to_evaluate)) - states.add(state) - session._register_altered(states) - - elif synchronize_session == 'fetch': - target_mapper = self._mapper_zero() - - states = set([ - attributes.instance_state(session.identity_map[identity_key]) - for identity_key in [ - target_mapper.identity_key_from_primary_key( - list(primary_key)) - for primary_key in matched_rows - ] - ]) - attrib = [_attr_as_key(k) for k in values] - for state in states: - session._expire_state(state, attrib) - session._register_altered(states) - - session.dispatch.after_bulk_update(session, self, context, result) + update_op = persistence.BulkUpdate.factory(self, synchronize_session, values) + update_op.exec_() + return update_op.rowcount - return result.rowcount def _compile_context(self, labels=True): context = QueryContext(self) diff --git a/test/orm/test_update_delete.py b/test/orm/test_update_delete.py index 351733708c..af23cd1fc3 100644 --- a/test/orm/test_update_delete.py +++ b/test/orm/test_update_delete.py @@ -40,6 +40,18 @@ class UpdateDeleteTest(fixtures.MappedTest): mapper(User, users) + def test_illegal_eval(self): + User = self.classes.User + s = Session() + assert_raises_message( + exc.ArgumentError, + "Valid strategies for session synchronization " + "are 'evaluate', 'fetch', False", + s.query(User).update, + {}, + synchronize_session="fake" + ) + def test_illegal_operations(self): User = self.classes.User