Primary key changing capabilities and passive/non-passive cascading updates.
"""
-from sqlalchemy.test.testing import eq_, ne_, assert_raises, assert_raises_message
+from sqlalchemy.test.testing import eq_, ne_, \
+ assert_raises, assert_raises_message
import sqlalchemy as sa
from sqlalchemy.test import testing
from sqlalchemy import Integer, String, ForeignKey, Unicode
addresses = Table('addresses', metadata,
Column('email', String(50), primary_key=True),
- Column('username', String(50), ForeignKey('users.username', **fk_args)),
+ Column('username', String(50),
+ ForeignKey('users.username', **fk_args)),
test_needs_fk=True)
items = Table('items', metadata,
test_needs_fk=True)
users_to_items = Table('users_to_items', metadata,
- Column('username', String(50), ForeignKey('users.username', **fk_args), primary_key=True),
- Column('itemname', String(50), ForeignKey('items.itemname', **fk_args), primary_key=True),
+ Column('username', String(50),
+ ForeignKey('users.username', **fk_args),
+ primary_key=True),
+ Column('itemname', String(50),
+ ForeignKey('items.itemname', **fk_args),
+ primary_key=True),
test_needs_fk=True)
@classmethod
assert u1.addresses[0].username == 'ed'
sess.expunge_all()
- eq_([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
+ eq_([Address(username='ed'), Address(username='ed')],
+ sess.query(Address).all())
u1 = sess.query(User).get('ed')
u1.username = 'jack'
def go():
sess.flush()
if not passive_updates:
- self.assert_sql_count(testing.db, go, 4) # test passive_updates=False;
- #load addresses, update user, update 2 addresses
+ # test passive_updates=False;
+ #load addresses, update user, update 2 addresses
+ self.assert_sql_count(testing.db, go, 4)
else:
- self.assert_sql_count(testing.db, go, 1) # test passive_updates=True; update user
+ # test passive_updates=True; update user
+ self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
assert User(username='jack', addresses=[
Address(username='jack'),
assert a1.username == a2.username == 'ed'
sess.expunge_all()
- eq_([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
+ eq_([Address(username='ed'), Address(username='ed')],
+ sess.query(Address).all())
@testing.fails_on('sqlite', 'sqlite doesnt support ON UPDATE CASCADE')
@testing.fails_on('oracle', 'oracle doesnt support ON UPDATE CASCADE')
@testing.resolve_artifact_names
def _test_onetoone(self, passive_updates):
mapper(User, users, properties={
- "address":relationship(Address, passive_updates=passive_updates, uselist=False)
+ "address":relationship(Address, passive_updates=passive_updates,
+ uselist=False)
})
mapper(Address, addresses)
self.assert_sql_count(testing.db, go, 3)
eq_([Address(username='ed'), Address(username='ed')], [ad1, ad2])
sess.expunge_all()
- eq_([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
+ eq_([Address(username='ed'), Address(username='ed')],
+ sess.query(Address).all())
u1 = sess.query(User).get('ed')
assert len(u1.addresses) == 2 # load addresses
else:
self.assert_sql_count(testing.db, go, 3)
sess.expunge_all()
- eq_([Address(username='fred'), Address(username='fred')], sess.query(Address).all())
+ eq_([Address(username='fred'), Address(username='fred')],
+ sess.query(Address).all())
@testing.fails_on('sqlite', 'sqlite doesnt support ON UPDATE CASCADE')
def test_manytomany_passive(self):
self._test_manytomany(True)
- # mysqldb executemany() of the association table fails to report the correct row count
- @testing.fails_if(lambda: testing.against('mysql') and not testing.against('+zxjdbc'))
+ # mysqldb executemany() of the association table fails to
+ # report the correct row count
+ @testing.fails_if(lambda: testing.against('mysql')
+ and not testing.against('+zxjdbc'))
def test_manytomany_nonpassive(self):
self._test_manytomany(False)
@testing.resolve_artifact_names
def _test_manytomany(self, passive_updates):
mapper(User, users, properties={
- 'items':relationship(Item, secondary=users_to_items, backref='users',
+ 'items':relationship(Item, secondary=users_to_items,
+ backref='users',
passive_updates=passive_updates)})
mapper(Item, items)
eq_(sess.query(User).count(), 2)
class ReversePKsTest(_base.MappedTest):
- """reverse the primary keys of two entities and ensure bookkeeping succeeds."""
+ """reverse the primary keys of two entities and ensure bookkeeping
+ succeeds."""
@classmethod
class SelfReferentialTest(_base.MappedTest):
- __unsupported_on__ = ('mssql','mysql') # mssql, mysql don't allow
- # ON UPDATE on self-referential keys
+ # mssql, mysql don't allow
+ # ON UPDATE on self-referential keys
+ __unsupported_on__ = ('mssql','mysql')
@classmethod
def define_tables(cls, metadata):
mapper(Node, nodes, properties={
'children': relationship(Node,
backref=sa.orm.backref('parentnode',
- remote_side=nodes.c.name,
- passive_updates=False),
+ remote_side=nodes.c.name,
+ passive_updates=False),
passive_updates=False)})
sess = create_session()
fk_args = dict(onupdate='cascade')
Table('users', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('username', String(50), unique=True),
Column('fullname', String(100)),
test_needs_fk=True)
Table('addresses', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('email', String(50)),
Column('username', String(50),
ForeignKey('users.username', **fk_args)),
@testing.resolve_artifact_names
def _test_onetomany(self, passive_updates):
mapper(User, users, properties={
- 'addresses':relationship(Address, passive_updates=passive_updates)})
+ 'addresses':relationship(Address,
+ passive_updates=passive_updates)})
mapper(Address, addresses)
sess = create_session()
sess.flush()
a1 = u1.addresses[0]
- eq_(sa.select([addresses.c.username]).execute().fetchall(), [('jack',), ('jack',)])
+ eq_(sa.select([addresses.c.username]).execute().fetchall(),
+ [('jack',), ('jack',)])
assert sess.query(Address).get(a1.id) is u1.addresses[0]
u1.username = 'ed'
sess.flush()
assert u1.addresses[0].username == 'ed'
- eq_(sa.select([addresses.c.username]).execute().fetchall(), [('ed',), ('ed',)])
+ eq_(sa.select([addresses.c.username]).execute().fetchall(),
+ [('ed',), ('ed',)])
sess.expunge_all()
- eq_([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
+ eq_([Address(username='ed'), Address(username='ed')],
+ sess.query(Address).all())
u1 = sess.query(User).get(u1.id)
u1.username = 'jack'
def go():
sess.flush()
if not passive_updates:
- self.assert_sql_count(testing.db, go, 4) # test passive_updates=False; load addresses, update user, update 2 addresses
+ # test passive_updates=False; load addresses,
+ # update user, update 2 addresses
+ self.assert_sql_count(testing.db, go, 4)
else:
- self.assert_sql_count(testing.db, go, 1) # test passive_updates=True; update user
+ # test passive_updates=True; update user
+ self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
- assert User(username='jack', addresses=[Address(username='jack'), Address(username='jack')]) == sess.query(User).get(u1.id)
+ assert User(username='jack',
+ addresses=[Address(username='jack'),
+ Address(username='jack')]) == \
+ sess.query(User).get(u1.id)
sess.expunge_all()
u1 = sess.query(User).get(u1.id)
a1 = sess.query(Address).get(a1.id)
eq_(a1.username, None)
- eq_(sa.select([addresses.c.username]).execute().fetchall(), [(None,), (None,)])
+ eq_(sa.select([addresses.c.username]).execute().fetchall(),
+ [(None,), (None,)])
u1 = sess.query(User).get(u1.id)
eq_(User(username='fred', fullname='jack'), u1)
class CascadeToFKPKTest(_base.MappedTest, testing.AssertsCompiledSQL):
- """A primary key mutation cascades onto a foreign key that is itself a primary key."""
+ """A primary key mutation cascades onto a foreign key that is itself a
+ primary key."""
@classmethod
def define_tables(cls, metadata):
"""
mapper(User, users, properties={
- 'addresses':relationship(Address, passive_updates=passive_updates)})
+ 'addresses':relationship(Address,
+ passive_updates=passive_updates)})
mapper(Address, addresses)
sess = create_session()
# being updated as well, since this is a row switch.
self.assert_sql_execution(testing.db,
sess.flush,
- CompiledSQL("UPDATE addresses SET etc=:etc WHERE "
- "addresses.username = :addresses_username AND"
- " addresses.email = :addresses_email",
- {'etc': 'foo', 'addresses_username':'ed', 'addresses_email':'ed@host1'} ),
+ CompiledSQL(
+ "UPDATE addresses SET etc=:etc WHERE "
+ "addresses.username = :addresses_username AND"
+ " addresses.email = :addresses_email",
+ {'etc': 'foo', 'addresses_username':'ed',
+ 'addresses_email':'ed@host1'} ),
)
"""
mapper(User, users, properties={
- 'addresses':relationship(Address, passive_updates=passive_updates)})
+ 'addresses':relationship(Address,
+ passive_updates=passive_updates)})
mapper(Address, addresses)
sess = create_session()
- a1, a2 = Address(username='ed', email='ed@host1'), Address(username='ed', email='ed@host2')
+ a1, a2 = Address(username='ed', email='ed@host1'),\
+ Address(username='ed', email='ed@host2')
u1 = User(username='ed', addresses=[a1, a2])
sess.add(u1)
sess.flush()
eq_(a1.username, 'ed')
eq_(a2.username, 'ed')
- eq_(sa.select([addresses.c.username]).execute().fetchall(), [('ed',), ('ed',)])
+ eq_(sa.select([addresses.c.username]).execute().fetchall(),
+ [('ed',), ('ed',)])
u1.username = 'jack'
a2.email='ed@host3'
eq_(a1.username, 'jack')
eq_(a2.username, 'jack')
- eq_(sa.select([addresses.c.username]).execute().fetchall(), [('jack',), ('jack', )])
+ eq_(sa.select([addresses.c.username]).execute().fetchall(),
+ [('jack',), ('jack', )])
class JoinedInheritanceTest(_base.MappedTest):
"""Test cascades of pk->pk/fk on joined table inh."""
- __unsupported_on__ = ('mssql',) # mssql doesn't allow ON UPDATE on self-referential keys
+ # mssql doesn't allow ON UPDATE on self-referential keys
+ __unsupported_on__ = ('mssql',)
@classmethod
def define_tables(cls, metadata):
test_needs_fk=True)
Table('engineer', metadata,
- Column('name', String(50), ForeignKey('person.name', **fk_args), primary_key=True),
+ Column('name', String(50), ForeignKey('person.name', **fk_args),
+ primary_key=True),
Column('primary_language', String(50)),
- Column('boss_name', String(50), ForeignKey('manager.name', **fk_args)),
- test_needs_fk=True
+ Column('boss_name', String(50),
+ ForeignKey('manager.name', **fk_args)),
+ test_needs_fk=True
)
Table('manager', metadata,
- Column('name', String(50), ForeignKey('person.name', **fk_args), primary_key=True),
+ Column('name', String(50),
+ ForeignKey('person.name', **fk_args),
+ primary_key=True),
Column('paperwork', String(50)),
test_needs_fk=True
)
self._test_fk(True)
# PG etc. need passive=True to allow PK->PK cascade
- @testing.fails_on_everything_except('sqlite', 'mysql+zxjdbc', 'postgresql+zxjdbc')
+ @testing.fails_on_everything_except('sqlite', 'mysql+zxjdbc',
+ 'postgresql+zxjdbc')
def test_fk_nonpassive(self):
self._test_fk(False)
@testing.resolve_artifact_names
def _test_pk(self, passive_updates):
mapper(Person, person, polymorphic_on=person.c.type,
- polymorphic_identity='person', passive_updates=passive_updates)
- mapper(Engineer, engineer, inherits=Person, polymorphic_identity='engineer', properties={
+ polymorphic_identity='person',
+ passive_updates=passive_updates)
+ mapper(Engineer, engineer, inherits=Person,
+ polymorphic_identity='engineer', properties={
'boss':relationship(Manager,
primaryjoin=manager.c.name==engineer.c.boss_name,
passive_updates=passive_updates
)
})
- mapper(Manager, manager, inherits=Person, polymorphic_identity='manager')
+ mapper(Manager, manager, inherits=Person,
+ polymorphic_identity='manager')
sess = sa.orm.sessionmaker()()
@testing.resolve_artifact_names
def _test_fk(self, passive_updates):
mapper(Person, person, polymorphic_on=person.c.type,
- polymorphic_identity='person', passive_updates=passive_updates)
- mapper(Engineer, engineer, inherits=Person, polymorphic_identity='engineer', properties={
+ polymorphic_identity='person',
+ passive_updates=passive_updates)
+ mapper(Engineer, engineer, inherits=Person,
+ polymorphic_identity='engineer', properties={
'boss':relationship(Manager,
primaryjoin=manager.c.name==engineer.c.boss_name,
passive_updates=passive_updates
)
})
- mapper(Manager, manager, inherits=Person, polymorphic_identity='manager')
+ mapper(Manager, manager, inherits=Person,
+ polymorphic_identity='manager')
sess = sa.orm.sessionmaker()()
print postsort_actions
eq_(len(postsort_actions), expected, postsort_actions)
-class UOWTest(_fixtures.FixtureTest, testing.AssertsExecutionResults, AssertsUOW):
+class UOWTest(_fixtures.FixtureTest,
+ testing.AssertsExecutionResults, AssertsUOW):
run_inserts = None
class RudimentaryFlushTest(UOWTest):
testing.db,
sess.flush,
CompiledSQL(
- "UPDATE addresses SET user_id=:user_id WHERE addresses.id = :addresses_id",
+ "UPDATE addresses SET user_id=:user_id WHERE "
+ "addresses.id = :addresses_id",
lambda ctx: [{u'addresses_id': a1.id, 'user_id': None}]
),
CompiledSQL(
- "UPDATE addresses SET user_id=:user_id WHERE addresses.id = :addresses_id",
+ "UPDATE addresses SET user_id=:user_id WHERE "
+ "addresses.id = :addresses_id",
lambda ctx: [{u'addresses_id': a2.id, 'user_id': None}]
),
CompiledSQL(
testing.db,
sess.flush,
CompiledSQL(
- "UPDATE addresses SET user_id=:user_id WHERE addresses.id = :addresses_id",
+ "UPDATE addresses SET user_id=:user_id WHERE "
+ "addresses.id = :addresses_id",
lambda ctx: [{u'addresses_id': a1.id, 'user_id': None}]
),
CompiledSQL(
- "UPDATE addresses SET user_id=:user_id WHERE addresses.id = :addresses_id",
+ "UPDATE addresses SET user_id=:user_id WHERE "
+ "addresses.id = :addresses_id",
lambda ctx: [{u'addresses_id': a2.id, 'user_id': None}]
),
CompiledSQL(
sess.flush,
CompiledSQL(
- "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ "INSERT INTO nodes (parent_id, data) VALUES "
+ "(:parent_id, :data)",
{'parent_id': None, 'data': 'n1'}
),
AllOf(
CompiledSQL(
- "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ "INSERT INTO nodes (parent_id, data) VALUES "
+ "(:parent_id, :data)",
lambda ctx: {'parent_id': n1.id, 'data': 'n2'}
),
CompiledSQL(
- "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ "INSERT INTO nodes (parent_id, data) VALUES "
+ "(:parent_id, :data)",
lambda ctx: {'parent_id': n1.id, 'data': 'n3'}
),
)
testing.db,
sess.flush,
AllOf(
- CompiledSQL("UPDATE nodes SET parent_id=:parent_id WHERE nodes.id = :nodes_id",
+ CompiledSQL("UPDATE nodes SET parent_id=:parent_id "
+ "WHERE nodes.id = :nodes_id",
lambda ctx: {'nodes_id':n3.id, 'parent_id':None}),
- CompiledSQL("UPDATE nodes SET parent_id=:parent_id WHERE nodes.id = :nodes_id",
+ CompiledSQL("UPDATE nodes SET parent_id=:parent_id "
+ "WHERE nodes.id = :nodes_id",
lambda ctx: {'nodes_id':n2.id, 'parent_id':None}),
),
CompiledSQL("DELETE FROM nodes WHERE nodes.id = :id",
sess.flush,
CompiledSQL(
- "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ "INSERT INTO nodes (parent_id, data) VALUES "
+ "(:parent_id, :data)",
{'parent_id': None, 'data': 'n1'}
),
AllOf(
CompiledSQL(
- "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ "INSERT INTO nodes (parent_id, data) VALUES "
+ "(:parent_id, :data)",
lambda ctx: {'parent_id': n1.id, 'data': 'n2'}
),
CompiledSQL(
- "INSERT INTO nodes (parent_id, data) VALUES (:parent_id, :data)",
+ "INSERT INTO nodes (parent_id, data) VALUES "
+ "(:parent_id, :data)",
lambda ctx: {'parent_id': n1.id, 'data': 'n3'}
),
)
sess.add_all([n2, n3])
sess.flush()
- print "-------------------------------------------------"
sess.delete(n1)
sess.delete(n2)
sess.delete(n3)
def test_bidirectional_mutations_one(self):
mapper(Node, nodes, properties={
- 'children':relationship(Node, backref=backref('parent', remote_side=nodes.c.id))
+ 'children':relationship(Node,
+ backref=backref('parent',
+ remote_side=nodes.c.id))
})
sess = create_session()
n1.children
self._assert_uow_size(sess, 2)
-class SingleCyclePlusAttributeTest(_base.MappedTest, testing.AssertsExecutionResults, AssertsUOW):
+class SingleCyclePlusAttributeTest(_base.MappedTest,
+ testing.AssertsExecutionResults, AssertsUOW):
@classmethod
def define_tables(cls, metadata):
Table('nodes', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('parent_id', Integer, ForeignKey('nodes.id')),
Column('data', String(30))
)
Table('foobars', metadata,
- Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
Column('parent_id', Integer, ForeignKey('nodes.id')),
)
sess.flush()
-class SingleCycleM2MTest(_base.MappedTest, testing.AssertsExecutionResults, AssertsUOW):
+class SingleCycleM2MTest(_base.MappedTest,
+ testing.AssertsExecutionResults, AssertsUOW):
@classmethod
def define_tables(cls, metadata):
# so check the end result
sess.flush()
eq_(
- sess.query(node_to_nodes.c.left_node_id, node_to_nodes.c.right_node_id).\
- order_by(node_to_nodes.c.left_node_id, node_to_nodes.c.right_node_id).\
+ sess.query(node_to_nodes.c.left_node_id,
+ node_to_nodes.c.right_node_id).\
+ order_by(node_to_nodes.c.left_node_id,
+ node_to_nodes.c.right_node_id).\
all(),
sorted([
(n1.id, n2.id), (n1.id, n3.id), (n1.id, n4.id),
pass
mapper(Parent, parent, properties={
- 'child':relationship(Child, uselist=False, cascade="all, delete-orphan", backref="parent")
+ 'child':relationship(Child, uselist=False,
+ cascade="all, delete-orphan",
+ backref="parent")
})
mapper(Child, child)