from sqlalchemy.testing import assert_raises, assert_raises_message
-from sqlalchemy.orm import interfaces, util
+from sqlalchemy.orm import util
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy.orm import aliased
from sqlalchemy.orm import mapper, create_session
-from sqlalchemy import testing
from sqlalchemy.testing import fixtures
from test.orm import _fixtures
-from sqlalchemy.testing import eq_
-
+from sqlalchemy.testing import eq_, is_
+from sqlalchemy.orm.util import PathRegistry
+from sqlalchemy import inspect
class AliasedClassTest(fixtures.TestBase):
def point_map(self, cls):
key = util.identity_key(User, row=row)
eq_(key, (User, (1,)))
-
+class PathRegistryTest(_fixtures.FixtureTest):
+ run_setup_mappers = 'once'
+ run_inserts = None
+ run_deletes = None
+
+
+ @classmethod
+ def setup_mappers(cls):
+ cls._setup_stock_mapping()
+
+ def test_root_registry(self):
+ umapper = inspect(self.classes.User)
+ is_(
+ util.RootRegistry()[umapper],
+ umapper._sa_path_registry
+ )
+ eq_(
+ util.RootRegistry()[umapper],
+ util.PathRegistry.coerce((umapper,))
+ )
+
+ def test_expand(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+ path = PathRegistry.coerce((umapper,))
+
+ eq_(
+ path['addresses'][amapper]['email_address'],
+ PathRegistry.coerce((umapper, 'addresses',
+ amapper, 'email_address'))
+ )
+
+ def test_entity_boolean(self):
+ umapper = inspect(self.classes.User)
+ path = PathRegistry.coerce((umapper,))
+ is_(bool(path), True)
+
+ def test_key_boolean(self):
+ umapper = inspect(self.classes.User)
+ path = PathRegistry.coerce((umapper, 'addresses'))
+ is_(bool(path), True)
+
+ def test_aliased_class(self):
+ User = self.classes.User
+ ua = aliased(User)
+ path = PathRegistry.coerce((ua, 'addresses'))
+ assert path.parent.is_aliased_class
+
+ def test_indexed_entity(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+ path = PathRegistry.coerce((umapper, 'addresses',
+ amapper, 'email_address'))
+ is_(path[0], umapper)
+ is_(path[2], amapper)
+
+ def test_indexed_key(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+ path = PathRegistry.coerce((umapper, 'addresses',
+ amapper, 'email_address'))
+ eq_(path[1], 'addresses')
+ eq_(path[3], 'email_address')
+
+ def test_slice(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+ path = PathRegistry.coerce((umapper, 'addresses',
+ amapper, 'email_address'))
+ eq_(path[1:3], ('addresses', amapper))
+
+ def test_addition(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p2 = PathRegistry.coerce((amapper, 'email_address'))
+ eq_(
+ p1 + p2,
+ PathRegistry.coerce((umapper, 'addresses',
+ amapper, 'email_address'))
+ )
+
+ def test_length(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+ pneg1 = PathRegistry.coerce(())
+ p0 = PathRegistry.coerce((umapper,))
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p3 = PathRegistry.coerce((umapper, 'addresses',
+ amapper, 'email_address'))
+
+ eq_(len(pneg1), 0)
+ eq_(len(p0), 1)
+ eq_(len(p1), 2)
+ eq_(len(p2), 3)
+ eq_(len(p3), 4)
+ eq_(pneg1.length, 0)
+ eq_(p0.length, 1)
+ eq_(p1.length, 2)
+ eq_(p2.length, 3)
+ eq_(p3.length, 4)
+
+ def test_eq(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p2 = PathRegistry.coerce((umapper, 'addresses'))
+ p3 = PathRegistry.coerce((umapper, 'other'))
+ p4 = PathRegistry.coerce((amapper, 'addresses'))
+ p5 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p6 = PathRegistry.coerce((amapper, 'user', umapper, 'addresses'))
+ p7 = PathRegistry.coerce((amapper, 'user', umapper, 'addresses',
+ amapper, 'email_address'))
+
+ is_(p1 == p2, True)
+ is_(p1 == p3, False)
+ is_(p1 == p4, False)
+ is_(p1 == p5, False)
+ is_(p6 == p7, False)
+ is_(p6 == p7.parent.parent, True)
+
+ is_(p1 != p2, False)
+ is_(p1 != p3, True)
+ is_(p1 != p4, True)
+ is_(p1 != p5, True)
+
+ def test_contains_mapper(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ assert p1.contains_mapper(umapper)
+ assert not p1.contains_mapper(amapper)
+
+ def _registry(self):
+ class Reg(dict):
+ @property
+ def _attributes(self):
+ return self
+ return Reg()
+
+ def test_path(self):
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p3 = PathRegistry.coerce((amapper, 'email_address'))
+
+ eq_(
+ p1.path, (umapper, 'addresses')
+ )
+ eq_(
+ p2.path, (umapper, 'addresses', amapper)
+ )
+ eq_(
+ p3.path, (amapper, 'email_address')
+ )
+
+ def test_registry_set(self):
+ reg = self._registry()
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p3 = PathRegistry.coerce((amapper, 'email_address'))
+
+ p1.set(reg, "p1key", "p1value")
+ p2.set(reg, "p2key", "p2value")
+ p3.set(reg, "p3key", "p3value")
+ eq_(
+ reg,
+ {
+ ('p1key', p1.path): 'p1value',
+ ('p2key', p2.path): 'p2value',
+ ('p3key', p3.path): 'p3value',
+ }
+ )
+
+ def test_registry_get(self):
+ reg = self._registry()
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p3 = PathRegistry.coerce((amapper, 'email_address'))
+ reg.update(
+ {
+ ('p1key', p1.path): 'p1value',
+ ('p2key', p2.path): 'p2value',
+ ('p3key', p3.path): 'p3value',
+ }
+ )
+
+ eq_(p1.get(reg, "p1key"), "p1value")
+ eq_(p2.get(reg, "p2key"), "p2value")
+ eq_(p2.get(reg, "p1key"), None)
+ eq_(p3.get(reg, "p3key"), "p3value")
+ eq_(p3.get(reg, "p1key"), None)
+
+ def test_registry_contains(self):
+ reg = self._registry()
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p3 = PathRegistry.coerce((amapper, 'email_address'))
+ reg.update(
+ {
+ ('p1key', p1.path): 'p1value',
+ ('p2key', p2.path): 'p2value',
+ ('p3key', p3.path): 'p3value',
+ }
+ )
+ assert p1.contains(reg, "p1key")
+ assert not p1.contains(reg, "p2key")
+ assert p3.contains(reg, "p3key")
+ assert not p2.contains(reg, "fake")
+
+ def test_registry_setdefault(self):
+ reg = self._registry()
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+
+ p1 = PathRegistry.coerce((umapper, 'addresses'))
+ p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ reg.update(
+ {
+ ('p1key', p1.path): 'p1value',
+ }
+ )
+
+ p1.setdefault(reg, "p1key", "p1newvalue_a")
+ p1.setdefault(reg, "p1key_new", "p1newvalue_b")
+ p2.setdefault(reg, "p2key", "p2newvalue")
+ eq_(
+ reg,
+ {
+ ('p1key', p1.path): 'p1value',
+ ('p1key_new', p1.path): 'p1newvalue_b',
+ ('p2key', p2.path): 'p2newvalue',
+ }
+ )
+
+ def test_serialize(self):
+ User = self.classes.User
+ Address = self.classes.Address
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+
+ p1 = PathRegistry.coerce((umapper, 'addresses', amapper,
+ 'email_address'))
+ p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p3 = PathRegistry.coerce((umapper, 'addresses'))
+ eq_(
+ p1.serialize(),
+ [(User, "addresses"), (Address, "email_address")]
+ )
+ eq_(
+ p2.serialize(),
+ [(User, "addresses"), (Address, None)]
+ )
+ eq_(
+ p3.serialize(),
+ [(User, "addresses")]
+ )
+
+ def test_deseralize(self):
+ User = self.classes.User
+ Address = self.classes.Address
+ umapper = inspect(self.classes.User)
+ amapper = inspect(self.classes.Address)
+
+
+ p1 = PathRegistry.coerce((umapper, 'addresses', amapper,
+ 'email_address'))
+ p2 = PathRegistry.coerce((umapper, 'addresses', amapper))
+ p3 = PathRegistry.coerce((umapper, 'addresses'))
+
+ eq_(
+ PathRegistry.deserialize([(User, "addresses"),
+ (Address, "email_address")]),
+ p1
+ )
+ eq_(
+ PathRegistry.deserialize([(User, "addresses"), (Address, None)]),
+ p2
+ )
+ eq_(
+ PathRegistry.deserialize([(User, "addresses")]),
+ p3
+ )