http://www.sqlalchemy.org/trac/wiki/UsageRecipes/PreFilteredQuery
for an example.
+ - Fixed recursion issue which occured if a mapped object's
+ `__len__()` or `__nonzero__()` method resulted in state
+ changes. [ticket:1501]
+
+ - Fixed incorrect exception raise in
+ Weak/StrongIdentityMap.add()
+ [ticket:1506]
+
- Fixed a somewhat hypothetical issue which would result
in the wrong primary key being calculated for a mapper
using the old polymorphic_union function - but this
def add(self, state):
if state.key in self:
if dict.__getitem__(self, state.key) is not state:
- raise AssertionError("A conflicting state is already present in the identity map for key %r" % state.key)
+ raise AssertionError("A conflicting state is already present in the identity map for key %r" % (state.key, ))
else:
dict.__setitem__(self, state.key, state)
self._manage_incoming_state(state)
dict.__setitem__(self, state.key, state.obj())
self._manage_incoming_state(state)
-
+
def add(self, state):
- dict.__setitem__(self, state.key, state.obj())
- self._manage_incoming_state(state)
-
+ if state.key in self:
+ if attributes.instance_state(dict.__getitem__(self, state.key)) is not state:
+ raise AssertionError("A conflicting state is already present in the identity map for key %r" % (state.key, ))
+ else:
+ dict.__setitem__(self, state.key, state.obj())
+ self._manage_incoming_state(state)
+
def remove(self, state):
if attributes.instance_state(dict.pop(self, state.key)) is not state:
raise AssertionError("State %s is not present in this identity map" % state)
instance_dict._modified.add(self)
self.modified = True
- if not self._strong_obj:
+ if self._strong_obj is None:
self._strong_obj = self.obj()
def commit(self, dict_, keys):
def assert_raises(except_cls, callable_, *args, **kw):
try:
callable_(*args, **kw)
- assert False, "Callable did not raise an exception"
+ success = False
except except_cls, e:
- pass
+ success = True
+
+ # assert outside the block so it works for AssertionError too !
+ assert success, "Callable did not raise an exception"
def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
try:
sa.orm.eagerload_all('h3s.h1s')).all()
eq_(len(h1s), 5)
+ @testing.resolve_artifact_names
+ def test_nonzero_len_recursion(self):
+ class H1(object):
+ def __len__(self):
+ return len(self.get_value())
+
+ def get_value(self):
+ self.value = "foobar"
+ return self.value
+
+ class H2(object):
+ def __nonzero__(self):
+ return bool(self.get_value())
+ def get_value(self):
+ self.value = "foobar"
+ return self.value
+
+ mapper(H1, ht1)
+ mapper(H2, ht1)
+
+ h1 = H1()
+ h1.value = "Asdf"
+ h1.value = "asdf asdf" # ding
+
+ h2 = H2()
+ h2.value = "Asdf"
+ h2.value = "asdf asdf" # ding
+
class MagicNamesTest(_base.MappedTest):
@classmethod
import inspect
import pickle
from sqlalchemy.orm import create_session, sessionmaker, attributes
+from sqlalchemy.orm.attributes import instance_state
import sqlalchemy as sa
from sqlalchemy.test import engines, testing, config
from sqlalchemy import Integer, String, Sequence
assert not s.dirty
assert not s.identity_map
-
+
+ @testing.resolve_artifact_names
+ def test_identity_conflict(self):
+ mapper(User, users)
+ for s in (
+ create_session(),
+ create_session(weak_identity_map=False),
+ ):
+ users.delete().execute()
+ u1 = User(name="ed")
+ s.add(u1)
+ s.flush()
+ s.expunge(u1)
+ u2 = s.query(User).first()
+ s.expunge(u2)
+ s.identity_map.add(sa.orm.attributes.instance_state(u1))
+
+ assert_raises(AssertionError, s.identity_map.add, sa.orm.attributes.instance_state(u2))
+
+
@testing.resolve_artifact_names
def test_weakref_with_cycles_o2m(self):
s = sessionmaker()()