if inherits is not None:
self.primarytable = inherits.primarytable
- # inherit_condition is optional since the join can figure it out
+ # inherit_condition is optional.
+ if inherit_condition is None:
+ # figure out inherit condition from our table to the immediate table
+ # of the inherited mapper, not its full table which could pull in other
+ # stuff we dont want (allows test/inheritance.InheritTest4 to pass)
+ inherit_condition = sql.join(inherits.noninherited_table, table).onclause
self.table = sql.join(inherits.table, table, inherit_condition)
+ #print "inherit condition", str(self.table.onclause)
+
+ # generate sync rules. similarly to creating the on clause, specify a
+ # stricter set of tables to create "sync rules" by,based on the immediate
+ # inherited table, rather than all inherited tables
self._synchronizer = sync.ClauseSynchronizer(self, self, sync.ONETOMANY)
- self._synchronizer.compile(self.table.onclause, inherits.tables, TableFinder(table))
+ self._synchronizer.compile(self.table.onclause, util.HashSet([inherits.noninherited_table]), TableFinder(table))
+ # the old rule
+ #self._synchronizer.compile(self.table.onclause, inherits.tables, TableFinder(table))
+
self.inherits = inherits
self.noninherited_table = table
else:
def __init__(self, table, check_columns=False):
self.tables = []
self.check_columns = check_columns
- table.accept_visitor(self)
+ if table is not None:
+ table.accept_visitor(self)
def visit_table(self, table):
self.tables.append(table)
def __len__(self):
def __contains__(self, obj):
return obj in self.tables
def __add__(self, obj):
- return self.tables + obj
+ return self.tables + list(obj)
def visit_column(self, column):
if self.check_columns:
column.table.accept_visitor(self)
self.syncrules = []
def compile(self, sqlclause, source_tables, target_tables, issecondary=None):
- def check_for_table(binary, l):
- for col in [binary.left, binary.right]:
- if col.table in l:
- return col
+ def check_for_table(binary, list1, list2):
+ #print "check for table", str(binary), [str(c) for c in l]
+ if binary.left.table in list1 and binary.right.table in list2:
+ return (binary.left, binary.right)
+ elif binary.right.table in list1 and binary.left.table in list2:
+ return (binary.right, binary.left)
else:
- return None
-
+ return (None, None)
+
def compile_binary(binary):
"""assembles a SyncRule given a single binary condition"""
if binary.operator != '=' or not isinstance(binary.left, schema.Column) or not isinstance(binary.right, schema.Column):
else:
raise AssertionError("assert failed")
else:
- pt = check_for_table(binary, source_tables)
- tt = check_for_table(binary, target_tables)
+ (pt, tt) = check_for_table(binary, source_tables, target_tables)
#print "OK", binary, [t.name for t in source_tables], [t.name for t in target_tables]
if pt and tt:
if self.direction == ONETOMANY:
self.issecondary = issecondary
self.dest_mapper = dest_mapper
self.dest_column = dest_column
- #print "SyncRule", source_mapper, source_column, dest_column, dest_mapper, direction
+ #print "SyncRule", source_mapper, source_column, dest_column, dest_mapper
def execute(self, source, dest, obj, child, clearkeys):
if source is None:
pass
class InheritTest(testbase.AssertMixin):
- def setUpAll(self):
- global principals
- global users
- global groups
- global user_group_map
- principals = Table(
- 'principals',
- testbase.db,
- Column('principal_id', Integer, Sequence('principal_id_seq', optional=False), primary_key=True),
- Column('name', String(50), nullable=False),
- )
-
- users = Table(
- 'prin_users',
- testbase.db,
- Column('principal_id', Integer, ForeignKey('principals.principal_id'), primary_key=True),
- Column('password', String(50), nullable=False),
- Column('email', String(50), nullable=False),
- Column('login_id', String(50), nullable=False),
-
- )
-
- groups = Table(
- 'prin_groups',
- testbase.db,
- Column( 'principal_id', Integer, ForeignKey('principals.principal_id'), primary_key=True),
-
- )
-
- user_group_map = Table(
- 'prin_user_group_map',
- testbase.db,
- Column('user_id', Integer, ForeignKey( "prin_users.principal_id"), primary_key=True ),
- Column('group_id', Integer, ForeignKey( "prin_groups.principal_id"), primary_key=True ),
- #Column('user_id', Integer, ForeignKey( "prin_users.principal_id"), ),
- #Column('group_id', Integer, ForeignKey( "prin_groups.principal_id"), ),
-
- )
-
- principals.create()
- users.create()
- groups.create()
- user_group_map.create()
- def tearDownAll(self):
- user_group_map.drop()
- groups.drop()
- users.drop()
- principals.drop()
- testbase.db.tables.clear()
- def setUp(self):
- objectstore.clear()
- clear_mappers()
-
- def testbasic(self):
- assign_mapper( Principal, principals )
- assign_mapper(
- User,
- users,
- inherits=Principal.mapper
- )
-
- assign_mapper(
- Group,
- groups,
- inherits=Principal.mapper,
- properties=dict( users = relation(User.mapper, user_group_map, lazy=True, backref="groups") )
- )
-
- g = Group(name="group1")
- g.users.append(User(name="user1", password="pw", email="foo@bar.com", login_id="lg1"))
-
- objectstore.commit()
+ """deals with inheritance and many-to-many relationships"""
+ def setUpAll(self):
+ global principals
+ global users
+ global groups
+ global user_group_map
+ principals = Table(
+ 'principals',
+ testbase.db,
+ Column('principal_id', Integer, Sequence('principal_id_seq', optional=False), primary_key=True),
+ Column('name', String(50), nullable=False),
+ )
+
+ users = Table(
+ 'prin_users',
+ testbase.db,
+ Column('principal_id', Integer, ForeignKey('principals.principal_id'), primary_key=True),
+ Column('password', String(50), nullable=False),
+ Column('email', String(50), nullable=False),
+ Column('login_id', String(50), nullable=False),
+
+ )
+
+ groups = Table(
+ 'prin_groups',
+ testbase.db,
+ Column( 'principal_id', Integer, ForeignKey('principals.principal_id'), primary_key=True),
+
+ )
+
+ user_group_map = Table(
+ 'prin_user_group_map',
+ testbase.db,
+ Column('user_id', Integer, ForeignKey( "prin_users.principal_id"), primary_key=True ),
+ Column('group_id', Integer, ForeignKey( "prin_groups.principal_id"), primary_key=True ),
+ #Column('user_id', Integer, ForeignKey( "prin_users.principal_id"), ),
+ #Column('group_id', Integer, ForeignKey( "prin_groups.principal_id"), ),
+
+ )
+
+ principals.create()
+ users.create()
+ groups.create()
+ user_group_map.create()
+ def tearDownAll(self):
+ user_group_map.drop()
+ groups.drop()
+ users.drop()
+ principals.drop()
+ testbase.db.tables.clear()
+ def setUp(self):
+ objectstore.clear()
+ clear_mappers()
+
+ def testbasic(self):
+ assign_mapper( Principal, principals )
+ assign_mapper(
+ User,
+ users,
+ inherits=Principal.mapper
+ )
+
+ assign_mapper(
+ Group,
+ groups,
+ inherits=Principal.mapper,
+ properties=dict( users = relation(User.mapper, user_group_map, lazy=True, backref="groups") )
+ )
+ g = Group(name="group1")
+ g.users.append(User(name="user1", password="pw", email="foo@bar.com", login_id="lg1"))
+
+ objectstore.commit()
+ # TODO: put an assertion
+
class InheritTest2(testbase.AssertMixin):
+ """deals with inheritance and many-to-many relationships"""
def setUpAll(self):
engine = testbase.db
global foo, bar, foo_bar
)
class InheritTest3(testbase.AssertMixin):
+ """deals with inheritance and many-to-many relationships"""
def setUpAll(self):
engine = testbase.db
global foo, bar, blub, bar_foo, blub_bar, blub_foo,tables
b.foos.append(Foo("foo #1"))
b.foos.append(Foo("foo #2"))
objectstore.commit()
+ compare = repr(b) + repr(b.foos)
objectstore.clear()
l = Bar.mapper.select()
- print l[0], l[0].foos
+ self.echo(repr(l[0]) + repr(l[0].foos))
+ self.assert_(repr(l[0]) + repr(l[0].foos) == compare)
def testadvanced(self):
class Foo(object):
self.echo(x)
self.assert_(repr(x) == compare)
+class InheritTest4(testbase.AssertMixin):
+ """deals with inheritance and one-to-many relationships"""
+ def setUpAll(self):
+ engine = testbase.db
+ global foo, bar, blub, tables
+ engine.engine.echo = 'debug'
+ # the 'data' columns are to appease SQLite which cant handle a blank INSERT
+ foo = Table('foo', engine,
+ Column('id', Integer, Sequence('foo_seq'), primary_key=True),
+ Column('data', String(20)))
+
+ bar = Table('bar', engine,
+ Column('id', Integer, ForeignKey('foo.id'), primary_key=True),
+ Column('data', String(20)))
+
+ blub = Table('blub', engine,
+ Column('id', Integer, ForeignKey('bar.id'), primary_key=True),
+ Column('foo_id', Integer, ForeignKey('foo.id'), nullable=False),
+ Column('data', String(20)))
+
+ tables = [foo, bar, blub]
+ for table in tables:
+ table.create()
+ def tearDownAll(self):
+ for table in reversed(tables):
+ table.drop()
+ testbase.db.tables.clear()
+
+ def tearDown(self):
+ for table in reversed(tables):
+ table.delete().execute()
+
+ def testbasic(self):
+ class Foo(object):
+ def __init__(self, data=None):
+ self.data = data
+ def __repr__(self):
+ return "Foo id %d, data %s" % (self.id, self.data)
+ Foo.mapper = mapper(Foo, foo)
+
+ class Bar(Foo):
+ def __repr__(self):
+ return "Bar id %d, data %s" % (self.id, self.data)
+
+ Bar.mapper = mapper(Bar, bar, inherits=Foo.mapper)
+
+ class Blub(Bar):
+ def __repr__(self):
+ return "Blub id %d, data %s" % (self.id, self.data)
+ Blub.mapper = mapper(Blub, blub, inherits=Bar.mapper, properties={
+ # bug was raised specifically based on the order of cols in the join....
+# 'parent_foo':relation(Foo.mapper, primaryjoin=blub.c.foo_id==foo.c.id)
+# 'parent_foo':relation(Foo.mapper, primaryjoin=foo.c.id==blub.c.foo_id)
+ 'parent_foo':relation(Foo.mapper)
+ })
+
+ b1 = Blub("blub #1")
+ b2 = Blub("blub #2")
+ f = Foo("foo #1")
+ b1.parent_foo = f
+ b2.parent_foo = f
+ objectstore.commit()
+ compare = repr(b1) + repr(b2) + repr(b1.parent_foo) + repr(b2.parent_foo)
+ objectstore.clear()
+ l = Blub.mapper.select()
+ result = repr(l[0]) + repr(l[1]) + repr(l[0].parent_foo) + repr(l[1].parent_foo)
+ self.echo(result)
+ self.assert_(compare == result)
+ self.assert_(l[0].parent_foo.data == 'foo #1' and l[1].parent_foo.data == 'foo #1')
if __name__ == "__main__":
testbase.main()